From 682c807f05c97792f3e610f2ab33545bb19f1a38 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 6 Sep 2019 16:41:05 -0700 Subject: [PATCH 1/6] Remove chttp2 combiner and replace it with a simple mutex. --- .../chttp2/transport/chttp2_transport.cc | 148 +++++++++++------- .../chttp2/transport/hpack_parser.cc | 9 +- .../ext/transport/chttp2/transport/internal.h | 4 +- .../ext/transport/chttp2/transport/writing.cc | 2 +- src/core/lib/iomgr/closure.h | 39 +++++ 5 files changed, 140 insertions(+), 62 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 5f5c480f9dc..91b6a25d213 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -198,8 +198,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { grpc_chttp2_stream_map_destroy(&stream_map); grpc_connectivity_state_destroy(&channel_callback.state_tracker); - GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); - cancel_pings(this, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); @@ -214,6 +212,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { GRPC_ERROR_UNREF(closed_with_error); gpr_free(ping_acks); gpr_free(peer_string); + gpr_mu_destroy(&mu); } static const grpc_transport_vtable* get_vtable(void); @@ -394,32 +393,29 @@ static bool read_channel_args(grpc_chttp2_transport* t, static void init_transport_closures(grpc_chttp2_transport* t) { GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t, - grpc_combiner_scheduler(t->combiner)); + destructive_reclaimer_locked, t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); + t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, next_bdp_ping_timer_expired_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, - t, grpc_combiner_scheduler(t->combiner)); + t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + start_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, - grpc_combiner_scheduler(t->combiner)); + finish_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, keepalive_watchdog_fired_locked, t, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); } static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { @@ -474,13 +470,13 @@ grpc_chttp2_transport::grpc_chttp2_transport( ep(ep), peer_string(grpc_endpoint_get_peer(ep)), resource_user(resource_user), - combiner(grpc_combiner_create()), is_client(is_client), next_stream_id(is_client ? 1 : 2), deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); base.vtable = get_vtable(); + gpr_mu_init(&mu); /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's @@ -561,11 +557,13 @@ grpc_chttp2_transport::grpc_chttp2_transport( static void destroy_transport_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); t->destroying = 1; close_transport_locked( t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + gpr_mu_unlock(&t->mu); // Must be the last line. GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } @@ -573,7 +571,7 @@ static void destroy_transport_locked(void* tp, grpc_error* error) { static void destroy_transport(grpc_transport* gt) { grpc_chttp2_transport* t = reinterpret_cast(gt); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -687,12 +685,13 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&flow_controlled_buffer); GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, - grpc_combiner_scheduler(t->combiner)); + grpc_schedule_on_exec_ctx); } grpc_chttp2_stream::~grpc_chttp2_stream() { + gpr_mu_lock(&t->mu); if (t->channelz_socket != nullptr) { if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { t->channelz_socket->RecordStreamSucceeded(); @@ -743,7 +742,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() { if (t->resource_user != nullptr) { grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); } - + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); } @@ -766,7 +765,6 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { GPR_TIMER_SCOPE("destroy_stream", 0); - grpc_chttp2_transport* t = reinterpret_cast(gt); grpc_chttp2_stream* s = reinterpret_cast(gs); if (s->stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS && @@ -784,7 +782,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -947,11 +945,10 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, * Also, 'write_action_begin_locked' only gathers the bytes into outbuf. * It does not call the endpoint to write the bytes. That is done by the * 'write_action' (which is scheduled by 'write_action_begin_locked') */ - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, @@ -1019,6 +1016,7 @@ static const char* begin_writing_desc(bool partial, bool inlined) { static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { GPR_TIMER_SCOPE("write_action_begin_locked", 0); grpc_chttp2_transport* t = static_cast(gt); + gpr_mu_lock(&t->mu); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (t->closed_with_error != GRPC_ERROR_NONE) { @@ -1059,9 +1057,11 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { t->reading_paused_on_pending_induced_frames = false; continue_read_action_locked(t); } + gpr_mu_unlock(&t->mu); } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } } @@ -1074,7 +1074,7 @@ static void write_action(void* gt, grpc_error* error) { grpc_endpoint_write( t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), cl); } @@ -1083,7 +1083,7 @@ static void write_action(void* gt, grpc_error* error) { static void write_action_end_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); - + gpr_mu_lock(&t->mu); bool closed = false; if (error != GRPC_ERROR_NONE) { close_transport_locked(t, GRPC_ERROR_REF(error)); @@ -1119,15 +1119,15 @@ static void write_action_end_locked(void* tp, grpc_error* error) { if (!closed) { GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); } - GRPC_CLOSURE_RUN( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); break; } grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } @@ -1389,6 +1389,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, static void complete_fetch_locked(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); if (error == GRPC_ERROR_NONE) { error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { @@ -1400,6 +1401,7 @@ static void complete_fetch_locked(void* gs, grpc_error* error) { s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } + gpr_mu_unlock(&t->mu); } static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, @@ -1425,6 +1427,7 @@ static void perform_stream_op_locked(void* stream_op, static_cast(op->handler_private.extra_arg); grpc_transport_stream_op_batch_payload* op_payload = op->payload; grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); GRPC_STATS_INC_HTTP2_OP_BATCHES(); @@ -1705,7 +1708,7 @@ static void perform_stream_op_locked(void* stream_op, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, "op->on_complete"); } - + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); } @@ -1738,7 +1741,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, op->handler_private.extra_arg = gs; GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, - op, grpc_combiner_scheduler(t->combiner)), + op, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -1749,7 +1752,11 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + if (j == GRPC_CHTTP2_PCL_INITIATE) { + GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); + } else { + GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); + } } GRPC_ERROR_UNREF(error); } @@ -1777,8 +1784,8 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -1797,10 +1804,12 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { static void retry_initiate_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); t->ping_state.is_delayed_ping_timer_set = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); } @@ -1854,7 +1863,7 @@ static void perform_transport_op_locked(void* stream_op, grpc_transport_op* op = static_cast(stream_op); grpc_chttp2_transport* t = static_cast(op->handler_private.extra_arg); - + gpr_mu_lock(&t->mu); if (op->goaway_error) { send_goaway(t, op->goaway_error); } @@ -1888,8 +1897,8 @@ static void perform_transport_op_locked(void* stream_op, close_transport_locked(t, op->disconnect_with_error); } - GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); - + GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); } @@ -1904,7 +1913,7 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_transport_op_locked, op, - grpc_combiner_scheduler(t->combiner)), + grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -2550,7 +2559,7 @@ static void read_action_locked(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); grpc_chttp2_transport* t = static_cast(tp); - + gpr_mu_lock(&t->mu); GRPC_ERROR_REF(error); grpc_error* err = error; @@ -2634,7 +2643,9 @@ static void read_action_locked(void* tp, grpc_error* error) { } else { continue_read_action_locked(t); } + gpr_mu_unlock(&t->mu); } else { + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } @@ -2656,6 +2667,8 @@ static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + // No need to take a lock. Closure scheduler will already have a lock. + // gpr_mu_lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2668,15 +2681,18 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_timer_cancel(&t->keepalive_ping_timer); } t->flow_control->bdp_estimator()->StartPing(); + // gpr_mu_unlock(&t->mu); } static void finish_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2687,17 +2703,21 @@ static void finish_bdp_ping_locked(void* tp, grpc_error* error) { t->have_next_bdp_ping_timer = true; grpc_timer_init(&t->next_bdp_ping_timer, next_ping, &t->next_bdp_ping_timer_expired_locked); + gpr_mu_unlock(&t->mu); } static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); + gpr_mu_lock(&t->mu); GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } schedule_bdp_ping_locked(t); + gpr_mu_unlock(&t->mu); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, @@ -2769,6 +2789,7 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; @@ -2793,6 +2814,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping_locked); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } @@ -2801,6 +2823,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } + // No need to take a lock. + // gpr_mu_lock(&t->mu); if (t->channelz_socket != nullptr) { t->channelz_socket->RecordKeepaliveSent(); } @@ -2811,10 +2835,12 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); + // gpr_mu_unlock(&t->mu); } static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { @@ -2828,11 +2854,13 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { &t->init_keepalive_ping_locked); } } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", @@ -2852,6 +2880,7 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); } } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); } @@ -2890,6 +2919,7 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast(arg); + gpr_mu_lock(&s->t->mu); s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2903,6 +2933,7 @@ static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error)); s->byte_stream_error = GRPC_ERROR_REF(error); } + gpr_mu_unlock(&s->t->mu); } namespace grpc_core { @@ -2924,25 +2955,28 @@ void Chttp2IncomingByteStream::OrphanLocked(void* arg, Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_stream* s = bs->stream_; grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); bs->Unref(); s->pending_byte_stream = false; grpc_chttp2_maybe_complete_recv_message(t, s); grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); + gpr_mu_unlock(&t->mu); } void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&destroy_action_, - &Chttp2IncomingByteStream::OrphanLocked, this, - grpc_combiner_scheduler(transport_->combiner)), - GRPC_ERROR_NONE); + OrphanLocked(this, GRPC_ERROR_NONE); + /*GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&destroy_action_, + &Chttp2IncomingByteStream::OrphanLocked, + this, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE);*/ } void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_error* error_ignored) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_transport* t = bs->transport_; + gpr_mu_lock(&t->mu); grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { @@ -2981,6 +3015,7 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, s->on_next = bs->next_action_.on_complete; } bs->Unref(); + gpr_mu_unlock(&t->mu); } bool Chttp2IncomingByteStream::Next(size_t max_size_hint, @@ -2992,11 +3027,10 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, Ref(); next_action_.max_size_hint = max_size_hint; next_action_.on_complete = on_complete; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&next_action_.closure, - &Chttp2IncomingByteStream::NextLocked, this, - grpc_combiner_scheduler(transport_->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&next_action_.closure, + &Chttp2IncomingByteStream::NextLocked, + this, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); return false; } } @@ -3124,6 +3158,7 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { static void benign_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { /* Channel with no active streams: send a goaway to try and make it @@ -3148,11 +3183,13 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) { grpc_resource_user_finish_reclamation( grpc_endpoint_get_resource_user(t->ep)); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } static void destructive_reclaimer_locked(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); + gpr_mu_lock(&t->mu); size_t n = grpc_chttp2_stream_map_size(&t->stream_map); t->destructive_reclaimer_registered = false; if (error == GRPC_ERROR_NONE && n > 0) { @@ -3179,6 +3216,7 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) { grpc_resource_user_finish_reclamation( grpc_endpoint_get_resource_user(t->ep)); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index a5142ffd96f..ab9f9e2c9eb 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1669,12 +1669,14 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { static void force_client_rst_stream(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; + gpr_mu_lock(&t->mu); if (!s->write_closed) { grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); } + gpr_mu_unlock(&t->mu); GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); } @@ -1741,10 +1743,9 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ GRPC_CHTTP2_STREAM_REF(s, "final_rst"); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(force_client_rst_stream, s, - grpc_combiner_finally_scheduler(t->combiner)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6e1db8a5707..784fcf42d31 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -39,6 +39,7 @@ #include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/timer.h" @@ -294,14 +295,13 @@ struct grpc_chttp2_transport { ~grpc_chttp2_transport(); grpc_transport base; /* must be first */ + gpr_mu mu; grpc_core::RefCount refs; grpc_endpoint* ep; char* peer_string; grpc_resource_user* resource_user; - grpc_combiner* combiner; - grpc_closure* notify_on_receive_settings = nullptr; /** write execution state of the transport */ diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index d6d9e4521f6..38327549ea8 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -104,7 +104,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { pq->inflight_id = t->ping_ctr; t->ping_ctr++; - GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + GRPC_CLOSURE_LIST_RUN(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); grpc_slice_buffer_add(&t->outbuf, diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index c7b2e8299b9..94667ae0a69 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -355,4 +355,43 @@ inline void grpc_closure_list_sched(grpc_closure_list* list) { grpc_closure_list_sched(closure_list) #endif +#ifndef NDEBUG +inline void grpc_closure_list_run(const char* file, int line, + grpc_closure_list* list) { +#else +inline void grpc_closure_list_run(grpc_closure_list* list) { +#endif + grpc_closure* c = list->head; + while (c != nullptr) { + grpc_closure* next = c->next_data.next; +#ifndef NDEBUG + if (c->scheduled) { + gpr_log(GPR_ERROR, + "Closure already scheduled. (closure: %p, created: [%s:%d], " + "previously scheduled at: [%s: %d] run?: %s", + c, c->file_created, c->line_created, c->file_initiated, + c->line_initiated, c->run ? "true" : "false"); + abort(); + } + c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; + GPR_ASSERT(c->cb != nullptr); +#endif + c->scheduler->vtable->run(c, c->error_data.error); + c = next; + } + list->head = list->tail = nullptr; +} + +/** Schedule all closures in a list to be run. Does not need to be run from a + * safe point. */ +#ifndef NDEBUG +#define GRPC_CLOSURE_LIST_RUN(closure_list) \ + grpc_closure_list_run(__FILE__, __LINE__, closure_list) +#else +#define GRPC_CLOSURE_LIST_RUN(closure_list) grpc_closure_list_run(closure_list) +#endif + #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ From 64e7131a614fabc57fa54326833f7a7a49513926 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 24 Sep 2019 11:50:30 -0700 Subject: [PATCH 2/6] Add some TODOs --- .../chttp2/transport/chttp2_transport.cc | 29 ++++++++++--------- .../chttp2/transport/hpack_parser.cc | 2 ++ .../ext/transport/chttp2/transport/internal.h | 2 +- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 91b6a25d213..af9e085ce85 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -929,6 +929,10 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // TODO(yashykt) : When we were using combiners, we were using the finally + // version, so that the write action would happen after we were done + // queueing up all the writes that we wanted. Maybe do something similar? + // Keeping the earlier comment for posterity - /* Note that the 'write_action_begin_locked' closure is being scheduled * on the 'finally_scheduler' of t->combiner. This means that * 'write_action_begin_locked' is called only *after* all the other @@ -1119,6 +1123,9 @@ static void write_action_end_locked(void* tp, grpc_error* error) { if (!closed) { GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); } + // TODO(yashykt) : When we were using combiners, we were using the finally + // version, so that the write action would happen after we were done + // queueing up all the writes that we wanted. Maybe do something similar? GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t, grpc_schedule_on_exec_ctx), @@ -1755,6 +1762,7 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { if (j == GRPC_CHTTP2_PCL_INITIATE) { GRPC_CLOSURE_LIST_RUN(&pq->lists[j]); } else { + // TODO(yashykt) : Use GRPC_CLOSURE_LIST_RUN for this too. GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); } } @@ -1784,6 +1792,7 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); + // TODO(yashykt) : Change this to GRPC_CLOSURE_RUN too GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); return; @@ -1821,6 +1830,7 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { gpr_free(from); return; } + // TODO(yashkt) : Change this to GRPC_CLOSURE_LIST_RUN GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); @@ -2667,8 +2677,8 @@ static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - // No need to take a lock. Closure scheduler will already have a lock. - // gpr_mu_lock(&t->mu); + // No need to take a lock. This closure will always be run while already + // holding the lock. if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string, grpc_error_string(error)); @@ -2681,7 +2691,6 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { grpc_timer_cancel(&t->keepalive_ping_timer); } t->flow_control->bdp_estimator()->StartPing(); - // gpr_mu_unlock(&t->mu); } static void finish_bdp_ping_locked(void* tp, grpc_error* error) { @@ -2823,8 +2832,8 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } - // No need to take a lock. - // gpr_mu_lock(&t->mu); + // No need to take a lock. This closure will always be run while already + // holding the lock. if (t->channelz_socket != nullptr) { t->channelz_socket->RecordKeepaliveSent(); } @@ -2835,7 +2844,6 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, &t->keepalive_watchdog_fired_locked); - // gpr_mu_unlock(&t->mu); } static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { @@ -2950,8 +2958,7 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( stream->byte_stream_error = GRPC_ERROR_NONE; } -void Chttp2IncomingByteStream::OrphanLocked(void* arg, - grpc_error* error_ignored) { +void Chttp2IncomingByteStream::OrphanLocked(void* arg) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_stream* s = bs->stream_; grpc_chttp2_transport* t = s->t; @@ -2965,11 +2972,7 @@ void Chttp2IncomingByteStream::OrphanLocked(void* arg, void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - OrphanLocked(this, GRPC_ERROR_NONE); - /*GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&destroy_action_, - &Chttp2IncomingByteStream::OrphanLocked, - this, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE);*/ + OrphanLocked(this); } void Chttp2IncomingByteStream::NextLocked(void* arg, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index ab9f9e2c9eb..fe6a689ac29 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1742,6 +1742,8 @@ grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, the stream. Wait until the combiner lock is ready to be released however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ + // TODO(yashykt) : When we were using combiners, we were using the + // finally version. Maybe do something similar? GRPC_CHTTP2_STREAM_REF(s, "final_rst"); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(force_client_rst_stream, s, grpc_schedule_on_exec_ctx), diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 784fcf42d31..6f301ad670c 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -254,7 +254,7 @@ class Chttp2IncomingByteStream : public ByteStream { private: static void NextLocked(void* arg, grpc_error* error_ignored); - static void OrphanLocked(void* arg, grpc_error* error_ignored); + static void OrphanLocked(void* arg); void MaybeCreateStreamDecompressionCtx(); From ac82cc422f806c2af5b651c6d8d297e9db7eed81 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 24 Sep 2019 13:21:04 -0700 Subject: [PATCH 3/6] Remove 'locked' from function names where applicable --- .../chttp2/transport/chttp2_transport.cc | 282 ++++++++---------- .../ext/transport/chttp2/transport/internal.h | 26 +- .../ext/transport/chttp2/transport/writing.cc | 2 +- 3 files changed, 130 insertions(+), 180 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index af9e085ce85..b73c1cfa2d7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -102,14 +102,14 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, "chttp2_refcount"); /* forward declarations of various callbacks that we'll build closures around */ -static void write_action_begin_locked(void* t, grpc_error* error); +static void write_action_begin(void* t, grpc_error* error); static void write_action(void* t, grpc_error* error); -static void write_action_end_locked(void* t, grpc_error* error); +static void write_action_end(void* t, grpc_error* error); -static void read_action_locked(void* t, grpc_error* error); +static void read_action(void* t, grpc_error* error); static void continue_read_action_locked(grpc_chttp2_transport* t); -static void complete_fetch_locked(void* gs, grpc_error* error); +static void complete_fetch(void* gs, grpc_error* error); /** Set a transport level setting, and push it to our peer */ static void queue_setting_update(grpc_chttp2_transport* t, grpc_chttp2_setting_id id, uint32_t value); @@ -124,8 +124,8 @@ static void connectivity_state_set(grpc_chttp2_transport* t, grpc_connectivity_state state, const char* reason); -static void benign_reclaimer_locked(void* t, grpc_error* error); -static void destructive_reclaimer_locked(void* t, grpc_error* error); +static void benign_reclaimer(void* t, grpc_error* error); +static void destructive_reclaimer(void* t, grpc_error* error); static void post_benign_reclaimer(grpc_chttp2_transport* t); static void post_destructive_reclaimer(grpc_chttp2_transport* t); @@ -135,20 +135,20 @@ static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error); static void schedule_bdp_ping_locked(grpc_chttp2_transport* t); static void start_bdp_ping_locked(void* tp, grpc_error* error); -static void finish_bdp_ping_locked(void* tp, grpc_error* error); -static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error); +static void finish_bdp_ping(void* tp, grpc_error* error); +static void next_bdp_ping_timer_expired(void* tp, grpc_error* error); static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error); static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_complete); -static void retry_initiate_ping_locked(void* tp, grpc_error* error); +static void retry_initiate_ping(void* tp, grpc_error* error); /** keepalive-relevant functions */ -static void init_keepalive_ping_locked(void* arg, grpc_error* error); +static void init_keepalive_ping(void* arg, grpc_error* error); static void start_keepalive_ping_locked(void* arg, grpc_error* error); -static void finish_keepalive_ping_locked(void* arg, grpc_error* error); -static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error); +static void finish_keepalive_ping(void* arg, grpc_error* error); +static void keepalive_watchdog_fired(void* arg, grpc_error* error); static void reset_byte_stream(void* arg, grpc_error* error); @@ -392,29 +392,26 @@ static bool read_channel_args(grpc_chttp2_transport* t, } static void init_transport_closures(grpc_chttp2_transport* t) { - GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, + GRPC_CLOSURE_INIT(&t->read_action, read_action, t, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->benign_reclaimer, benign_reclaimer, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, + GRPC_CLOSURE_INIT(&t->destructive_reclaimer, destructive_reclaimer, t, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->retry_initiate_ping, retry_initiate_ping, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked, - t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, + GRPC_CLOSURE_INIT(&t->finish_bdp_ping, finish_bdp_ping, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, - next_bdp_ping_timer_expired_locked, t, + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired, + next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->init_keepalive_ping, init_keepalive_ping, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, - t, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, - keepalive_watchdog_fired_locked, t, + GRPC_CLOSURE_INIT(&t->finish_keepalive_ping, finish_keepalive_ping, t, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired, keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx); } @@ -455,7 +452,7 @@ static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); + &t->init_keepalive_ping); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no inflight keeaplive timers */ @@ -555,8 +552,8 @@ grpc_chttp2_transport::grpc_chttp2_transport( post_benign_reclaimer(this); } -static void destroy_transport_locked(void* tp, grpc_error* error) { - grpc_chttp2_transport* t = static_cast(tp); +static void destroy_transport(grpc_transport* gt) { + grpc_chttp2_transport* t = reinterpret_cast(gt); gpr_mu_lock(&t->mu); t->destroying = 1; close_transport_locked( @@ -568,13 +565,6 @@ static void destroy_transport_locked(void* tp, grpc_error* error) { GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } -static void destroy_transport(grpc_transport* gt) { - grpc_chttp2_transport* t = reinterpret_cast(gt); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); -} - static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error) { end_all_the_calls(t, GRPC_ERROR_REF(error)); @@ -684,7 +674,7 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&flow_controlled_buffer); - GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, + GRPC_CLOSURE_INIT(&complete_fetch, ::complete_fetch, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, grpc_schedule_on_exec_ctx); @@ -933,26 +923,26 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, // version, so that the write action would happen after we were done // queueing up all the writes that we wanted. Maybe do something similar? // Keeping the earlier comment for posterity - - /* Note that the 'write_action_begin_locked' closure is being scheduled + /* Note that the 'write_action_begin' closure is being scheduled * on the 'finally_scheduler' of t->combiner. This means that - * 'write_action_begin_locked' is called only *after* all the other + * 'write_action_begin' is called only *after* all the other * closures (some of which are potentially initiating more writes on the * transport) are executed on the t->combiner. * * The reason for scheduling on finally_scheduler is to make sure we batch - * as many writes as possible. 'write_action_begin_locked' is the function + * as many writes as possible. 'write_action_begin' is the function * that gathers all the relevant bytes (which are at various places in the * grpc_chttp2_transport structure) and append them to 'outbuf' field in * grpc_chttp2_transport thereby batching what would have been potentially * multiple write operations. * - * Also, 'write_action_begin_locked' only gathers the bytes into outbuf. + * Also, 'write_action_begin' only gathers the bytes into outbuf. * It does not call the endpoint to write the bytes. That is done by the - * 'write_action' (which is scheduled by 'write_action_begin_locked') */ - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + * 'write_action' (which is scheduled by 'write_action_begin') */ + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, @@ -1017,8 +1007,8 @@ static const char* begin_writing_desc(bool partial, bool inlined) { GPR_UNREACHABLE_CODE(return "bad state tuple"); } -static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { - GPR_TIMER_SCOPE("write_action_begin_locked", 0); +static void write_action_begin(void* gt, grpc_error* error_ignored) { + GPR_TIMER_SCOPE("write_action_begin", 0); grpc_chttp2_transport* t = static_cast(gt); gpr_mu_lock(&t->mu); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); @@ -1075,16 +1065,15 @@ static void write_action(void* gt, grpc_error* error) { grpc_chttp2_transport* t = static_cast(gt); void* cl = t->cl; t->cl = nullptr; - grpc_endpoint_write( - t->ep, &t->outbuf, - GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_schedule_on_exec_ctx), - cl); + grpc_endpoint_write(t->ep, &t->outbuf, + GRPC_CLOSURE_INIT(&t->write_action_end, write_action_end, + t, grpc_schedule_on_exec_ctx), + cl); } /* Callback from the grpc_endpoint after bytes have been written by calling * sendmsg */ -static void write_action_end_locked(void* tp, grpc_error* error) { +static void write_action_end(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); gpr_mu_lock(&t->mu); @@ -1126,10 +1115,10 @@ static void write_action_end_locked(void* tp, grpc_error* error) { // TODO(yashykt) : When we were using combiners, we were using the finally // version, so that the write action would happen after we were done // queueing up all the writes that we wanted. Maybe do something similar? - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); break; } @@ -1380,8 +1369,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } s->fetching_send_message.reset(); return; /* early out */ - } else if (s->fetching_send_message->Next(UINT32_MAX, - &s->complete_fetch_locked)) { + } else if (s->fetching_send_message->Next(UINT32_MAX, &s->complete_fetch)) { grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); if (error != GRPC_ERROR_NONE) { s->fetching_send_message.reset(); @@ -1393,7 +1381,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, } } -static void complete_fetch_locked(void* gs, grpc_error* error) { +static void complete_fetch(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; gpr_mu_lock(&t->mu); @@ -1424,25 +1412,41 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, } } -static void perform_stream_op_locked(void* stream_op, - grpc_error* error_ignored) { - GPR_TIMER_SCOPE("perform_stream_op_locked", 0); - - grpc_transport_stream_op_batch* op = - static_cast(stream_op); - grpc_chttp2_stream* s = - static_cast(op->handler_private.extra_arg); +static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, + grpc_transport_stream_op_batch* op) { + GPR_TIMER_SCOPE("perform_stream_op", 0); + grpc_chttp2_transport* t = reinterpret_cast(gt); + grpc_chttp2_stream* s = reinterpret_cast(gs); grpc_transport_stream_op_batch_payload* op_payload = op->payload; - grpc_chttp2_transport* t = s->t; + + if (!t->is_client) { + if (op->send_initial_metadata) { + grpc_millis deadline = + op_payload->send_initial_metadata.send_initial_metadata->deadline; + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); + } + if (op->send_trailing_metadata) { + grpc_millis deadline = + op_payload->send_trailing_metadata.send_trailing_metadata->deadline; + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); + } + } + + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + char* str = grpc_transport_stream_op_batch_string(op); + gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str); + gpr_free(str); + } + gpr_mu_lock(&t->mu); GRPC_STATS_INC_HTTP2_OP_BATCHES(); - s->context = op->payload->context; + s->context = op_payload->context; s->traced = op->is_traced; if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { char* str = grpc_transport_stream_op_batch_string(op); - gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str, + gpr_log(GPR_INFO, "perform_stream_op: %s; on_complete = %p", str, op->on_complete); gpr_free(str); if (op->send_initial_metadata) { @@ -1716,40 +1720,6 @@ static void perform_stream_op_locked(void* stream_op, "op->on_complete"); } gpr_mu_unlock(&t->mu); - GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); -} - -static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, - grpc_transport_stream_op_batch* op) { - GPR_TIMER_SCOPE("perform_stream_op", 0); - grpc_chttp2_transport* t = reinterpret_cast(gt); - grpc_chttp2_stream* s = reinterpret_cast(gs); - - if (!t->is_client) { - if (op->send_initial_metadata) { - grpc_millis deadline = - op->payload->send_initial_metadata.send_initial_metadata->deadline; - GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); - } - if (op->send_trailing_metadata) { - grpc_millis deadline = - op->payload->send_trailing_metadata.send_trailing_metadata->deadline; - GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); - } - } - - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - char* str = grpc_transport_stream_op_batch_string(op); - gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str); - gpr_free(str); - } - - GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); - op->handler_private.extra_arg = gs; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, - op, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); } static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { @@ -1793,7 +1763,7 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_REF(t->closed_with_error)); // TODO(yashykt) : Change this to GRPC_CLOSURE_RUN too - GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, + GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping, GRPC_ERROR_REF(t->closed_with_error)); return; } @@ -1802,16 +1772,16 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { /* There is a ping in flight. Add yourself to the inflight closure list. */ GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], - &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); + &t->finish_keepalive_ping, GRPC_ERROR_NONE); return; } grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], &t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], - &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); + &t->finish_keepalive_ping, GRPC_ERROR_NONE); } -static void retry_initiate_ping_locked(void* tp, grpc_error* error) { +static void retry_initiate_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); gpr_mu_lock(&t->mu); t->ping_state.is_delayed_ping_timer_set = false; @@ -1819,7 +1789,7 @@ static void retry_initiate_ping_locked(void* tp, grpc_error* error) { grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } gpr_mu_unlock(&t->mu); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping"); } void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { @@ -1868,11 +1838,14 @@ void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) { } } -static void perform_transport_op_locked(void* stream_op, - grpc_error* error_ignored) { - grpc_transport_op* op = static_cast(stream_op); - grpc_chttp2_transport* t = - static_cast(op->handler_private.extra_arg); +static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { + grpc_chttp2_transport* t = reinterpret_cast(gt); + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + char* msg = grpc_transport_op_string(op); + gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); + gpr_free(msg); + } + op->handler_private.extra_arg = gt; gpr_mu_lock(&t->mu); if (op->goaway_error) { send_goaway(t, op->goaway_error); @@ -1909,22 +1882,6 @@ static void perform_transport_op_locked(void* stream_op, GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); gpr_mu_unlock(&t->mu); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); -} - -static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { - grpc_chttp2_transport* t = reinterpret_cast(gt); - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - char* msg = grpc_transport_op_string(op); - gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); - gpr_free(msg); - } - op->handler_private.extra_arg = gt; - GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, - perform_transport_op_locked, op, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); } /******************************************************************************* @@ -2565,7 +2522,7 @@ static grpc_error* try_http_parsing(grpc_chttp2_transport* t) { return error; } -static void read_action_locked(void* tp, grpc_error* error) { +static void read_action(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); grpc_chttp2_transport* t = static_cast(tp); @@ -2664,7 +2621,7 @@ static void read_action_locked(void* tp, grpc_error* error) { static void continue_read_action_locked(grpc_chttp2_transport* t) { const bool urgent = t->goaway_error != GRPC_ERROR_NONE; - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); + grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action, urgent); grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); } @@ -2672,7 +2629,7 @@ static void continue_read_action_locked(grpc_chttp2_transport* t) { // that kicks off finishes, it's unreffed static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { t->flow_control->bdp_estimator()->SchedulePing(); - send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping); } static void start_bdp_ping_locked(void* tp, grpc_error* error) { @@ -2693,7 +2650,7 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { t->flow_control->bdp_estimator()->StartPing(); } -static void finish_bdp_ping_locked(void* tp, grpc_error* error) { +static void finish_bdp_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); gpr_mu_lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { @@ -2711,11 +2668,11 @@ static void finish_bdp_ping_locked(void* tp, grpc_error* error) { GPR_ASSERT(!t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = true; grpc_timer_init(&t->next_bdp_ping_timer, next_ping, - &t->next_bdp_ping_timer_expired_locked); + &t->next_bdp_ping_timer_expired); gpr_mu_unlock(&t->mu); } -static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) { +static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); gpr_mu_lock(&t->mu); GPR_ASSERT(t->have_next_bdp_ping_timer); @@ -2796,7 +2753,7 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } -static void init_keepalive_ping_locked(void* arg, grpc_error* error) { +static void init_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); gpr_mu_lock(&t->mu); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); @@ -2814,14 +2771,14 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); + &t->init_keepalive_ping); } } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); + &t->init_keepalive_ping); } gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); @@ -2843,10 +2800,10 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, - &t->keepalive_watchdog_fired_locked); + &t->keepalive_watchdog_fired); } -static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { +static void finish_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); gpr_mu_lock(&t->mu); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { @@ -2859,14 +2816,14 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping_locked); + &t->init_keepalive_ping); } } gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } -static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { +static void keepalive_watchdog_fired(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); gpr_mu_lock(&t->mu); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { @@ -2882,7 +2839,7 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) { } } else { /* The watchdog timer should have been cancelled by - * finish_keepalive_ping_locked. */ + * finish_keepalive_ping. */ if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); @@ -2958,23 +2915,18 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( stream->byte_stream_error = GRPC_ERROR_NONE; } -void Chttp2IncomingByteStream::OrphanLocked(void* arg) { - Chttp2IncomingByteStream* bs = static_cast(arg); - grpc_chttp2_stream* s = bs->stream_; - grpc_chttp2_transport* t = s->t; - gpr_mu_lock(&t->mu); - bs->Unref(); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); - gpr_mu_unlock(&t->mu); -} - void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - OrphanLocked(this); + grpc_chttp2_transport* t = stream_->t; + gpr_mu_lock(&t->mu); + Unref(); + stream_->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(t, stream_); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, stream_); + gpr_mu_unlock(&t->mu); } +// TODO(yashykt) : Merge this with Next void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_error* error_ignored) { Chttp2IncomingByteStream* bs = static_cast(arg); @@ -3146,7 +3098,7 @@ static void post_benign_reclaimer(grpc_chttp2_transport* t) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), - false, &t->benign_reclaimer_locked); + false, &t->benign_reclaimer); } } @@ -3155,11 +3107,11 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), - true, &t->destructive_reclaimer_locked); + true, &t->destructive_reclaimer); } } -static void benign_reclaimer_locked(void* arg, grpc_error* error) { +static void benign_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); gpr_mu_lock(&t->mu); if (error == GRPC_ERROR_NONE && @@ -3190,7 +3142,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) { GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } -static void destructive_reclaimer_locked(void* arg, grpc_error* error) { +static void destructive_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); gpr_mu_lock(&t->mu); size_t n = grpc_chttp2_stream_map_size(&t->stream_map); @@ -3318,5 +3270,5 @@ void grpc_chttp2_transport_start_reading( gpr_free(read_buffer); } t->notify_on_receive_settings = notify_on_receive_settings; - GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&t->read_action, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6f301ad670c..a57dadb84ad 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -254,7 +254,6 @@ class Chttp2IncomingByteStream : public ByteStream { private: static void NextLocked(void* arg, grpc_error* error_ignored); - static void OrphanLocked(void* arg); void MaybeCreateStreamDecompressionCtx(); @@ -276,7 +275,6 @@ class Chttp2IncomingByteStream : public ByteStream { size_t max_size_hint; grpc_closure* on_complete; } next_action_; - grpc_closure destroy_action_; }; } // namespace grpc_core @@ -327,11 +325,11 @@ struct grpc_chttp2_transport { /** maps stream id to grpc_chttp2_stream objects */ grpc_chttp2_stream_map stream_map; - grpc_closure write_action_begin_locked; + grpc_closure write_action_begin; grpc_closure write_action; - grpc_closure write_action_end_locked; + grpc_closure write_action_end; - grpc_closure read_action_locked; + grpc_closure read_action; /** incoming read bytes */ grpc_slice_buffer read_buffer; @@ -394,7 +392,7 @@ struct grpc_chttp2_transport { grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_state ping_state; uint64_t ping_ctr = 0; /* unique id for pings */ - grpc_closure retry_initiate_ping_locked; + grpc_closure retry_initiate_ping; /** ping acks */ size_t ping_ack_count = 0; @@ -444,9 +442,9 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb* write_cb_pool = nullptr; /* bdp estimator */ - grpc_closure next_bdp_ping_timer_expired_locked; + grpc_closure next_bdp_ping_timer_expired; grpc_closure start_bdp_ping_locked; - grpc_closure finish_bdp_ping_locked; + grpc_closure finish_bdp_ping; /* if non-NULL, close the transport with this error when writes are finished */ @@ -461,9 +459,9 @@ struct grpc_chttp2_transport { /** have we scheduled a destructive cleanup? */ bool destructive_reclaimer_registered = false; /** benign cleanup closure */ - grpc_closure benign_reclaimer_locked; + grpc_closure benign_reclaimer; /** destructive cleanup closure */ - grpc_closure destructive_reclaimer_locked; + grpc_closure destructive_reclaimer; /* next bdp ping timer */ bool have_next_bdp_ping_timer = false; @@ -471,13 +469,13 @@ struct grpc_chttp2_transport { /* keep-alive ping support */ /** Closure to initialize a keepalive ping */ - grpc_closure init_keepalive_ping_locked; + grpc_closure init_keepalive_ping; /** Closure to run when the keepalive ping is sent */ grpc_closure start_keepalive_ping_locked; /** Cousure to run when the keepalive ping ack is received */ - grpc_closure finish_keepalive_ping_locked; + grpc_closure finish_keepalive_ping; /** Closrue to run when the keepalive ping timeouts */ - grpc_closure keepalive_watchdog_fired_locked; + grpc_closure keepalive_watchdog_fired; /** timer to initiate ping events */ grpc_timer keepalive_ping_timer; /** watchdog to kill the transport when waiting for the keepalive ping */ @@ -545,7 +543,7 @@ struct grpc_chttp2_stream { int64_t next_message_end_offset; int64_t flow_controlled_bytes_written = 0; int64_t flow_controlled_bytes_flowed = 0; - grpc_closure complete_fetch_locked; + grpc_closure complete_fetch; grpc_closure* fetching_send_message_finished = nullptr; grpc_metadata_batch* recv_initial_metadata; diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 38327549ea8..4796dee4cf6 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -97,7 +97,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { t->ping_state.is_delayed_ping_timer_set = true; GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, - &t->retry_initiate_ping_locked); + &t->retry_initiate_ping); } return; } From 0d2558d05f0094fbad2ff7d91b43e9785807777a Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 24 Sep 2019 15:20:09 -0700 Subject: [PATCH 4/6] Use Mutex and MutexLock instead of gpr_mu --- .../chttp2/transport/chttp2_transport.cc | 469 +++++++++--------- .../chttp2/transport/hpack_parser.cc | 16 +- .../ext/transport/chttp2/transport/internal.h | 2 +- 3 files changed, 245 insertions(+), 242 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index b73c1cfa2d7..fa79ee0c127 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -212,7 +212,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { GRPC_ERROR_UNREF(closed_with_error); gpr_free(ping_acks); gpr_free(peer_string); - gpr_mu_destroy(&mu); } static const grpc_transport_vtable* get_vtable(void); @@ -473,7 +472,6 @@ grpc_chttp2_transport::grpc_chttp2_transport( GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); base.vtable = get_vtable(); - gpr_mu_init(&mu); /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's @@ -554,13 +552,14 @@ grpc_chttp2_transport::grpc_chttp2_transport( static void destroy_transport(grpc_transport* gt) { grpc_chttp2_transport* t = reinterpret_cast(gt); - gpr_mu_lock(&t->mu); - t->destroying = 1; - close_transport_locked( - t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), - GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); - gpr_mu_unlock(&t->mu); + { + grpc_core::MutexLock lock(&t->mu); + t->destroying = 1; + close_transport_locked( + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + } // Must be the last line. GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } @@ -681,58 +680,60 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, } grpc_chttp2_stream::~grpc_chttp2_stream() { - gpr_mu_lock(&t->mu); - if (t->channelz_socket != nullptr) { - if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { - t->channelz_socket->RecordStreamSucceeded(); - } else { - t->channelz_socket->RecordStreamFailed(); + { + grpc_core::MutexLock lock(&t->mu); + if (t->channelz_socket != nullptr) { + if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); + } } - } - GPR_ASSERT((write_closed && read_closed) || id == 0); - if (id != 0) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); - } + GPR_ASSERT((write_closed && read_closed) || id == 0); + if (id != 0) { + GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); + } - grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(&frame_storage); - if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { - grpc_slice_buffer_destroy_internal(&compressed_data_buffer); - } - if (stream_decompression_method != - GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { - grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); - } + grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(&frame_storage); + if (stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + } + if (stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + } - grpc_chttp2_list_remove_stalled_by_transport(t, this); - grpc_chttp2_list_remove_stalled_by_stream(t, this); + grpc_chttp2_list_remove_stalled_by_transport(t, this); + grpc_chttp2_list_remove_stalled_by_stream(t, this); - for (int i = 0; i < STREAM_LIST_COUNT; i++) { - if (GPR_UNLIKELY(included[i])) { - gpr_log(GPR_ERROR, "%s stream %d still included in list %d", - t->is_client ? "client" : "server", id, i); - abort(); + for (int i = 0; i < STREAM_LIST_COUNT; i++) { + if (GPR_UNLIKELY(included[i])) { + gpr_log(GPR_ERROR, "%s stream %d still included in list %d", + t->is_client ? "client" : "server", id, i); + abort(); + } } - } - GPR_ASSERT(send_initial_metadata_finished == nullptr); - GPR_ASSERT(fetching_send_message == nullptr); - GPR_ASSERT(send_trailing_metadata_finished == nullptr); - GPR_ASSERT(recv_initial_metadata_ready == nullptr); - GPR_ASSERT(recv_message_ready == nullptr); - GPR_ASSERT(recv_trailing_metadata_finished == nullptr); - grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); - GRPC_ERROR_UNREF(read_closed_error); - GRPC_ERROR_UNREF(write_closed_error); - GRPC_ERROR_UNREF(byte_stream_error); + GPR_ASSERT(send_initial_metadata_finished == nullptr); + GPR_ASSERT(fetching_send_message == nullptr); + GPR_ASSERT(send_trailing_metadata_finished == nullptr); + GPR_ASSERT(recv_initial_metadata_ready == nullptr); + GPR_ASSERT(recv_message_ready == nullptr); + GPR_ASSERT(recv_trailing_metadata_finished == nullptr); + grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); + GRPC_ERROR_UNREF(read_closed_error); + GRPC_ERROR_UNREF(write_closed_error); + GRPC_ERROR_UNREF(byte_stream_error); - flow_control.Destroy(); + flow_control.Destroy(); - if (t->resource_user != nullptr) { - grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + if (t->resource_user != nullptr) { + grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); } @@ -1010,7 +1011,7 @@ static const char* begin_writing_desc(bool partial, bool inlined) { static void write_action_begin(void* gt, grpc_error* error_ignored) { GPR_TIMER_SCOPE("write_action_begin", 0); grpc_chttp2_transport* t = static_cast(gt); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (t->closed_with_error != GRPC_ERROR_NONE) { @@ -1051,11 +1052,11 @@ static void write_action_begin(void* gt, grpc_error* error_ignored) { t->reading_paused_on_pending_induced_frames = false; continue_read_action_locked(t); } - gpr_mu_unlock(&t->mu); + lock.Unlock(); } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } } @@ -1076,54 +1077,56 @@ static void write_action(void* gt, grpc_error* error) { static void write_action_end(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); - bool closed = false; - if (error != GRPC_ERROR_NONE) { - close_transport_locked(t, GRPC_ERROR_REF(error)); - closed = true; - } - - if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { - t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; - closed = true; - if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - close_transport_locked( - t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); + { + grpc_core::MutexLock lock(&t->mu); + bool closed = false; + if (error != GRPC_ERROR_NONE) { + close_transport_locked(t, GRPC_ERROR_REF(error)); + closed = true; } - } - switch (t->write_state) { - case GRPC_CHTTP2_WRITE_STATE_IDLE: - GPR_UNREACHABLE_CODE(break); - case GRPC_CHTTP2_WRITE_STATE_WRITING: - GPR_TIMER_MARK("state=writing", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: - GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); - t->is_first_write_in_batch = false; - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - // If the transport is closed, we will retry writing on the endpoint - // and next write may contain part of the currently serialized frames. - // So, we should only call the run_after_write callbacks when the next - // write finishes, or the callbacks will be invoked when the stream is - // closed. - if (!closed) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + closed = true; + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + close_transport_locked( + t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); } - // TODO(yashykt) : When we were using combiners, we were using the finally - // version, so that the write action would happen after we were done - // queueing up all the writes that we wanted. Maybe do something similar? - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); - break; - } + } + + switch (t->write_state) { + case GRPC_CHTTP2_WRITE_STATE_IDLE: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITE_STATE_WRITING: + GPR_TIMER_MARK("state=writing", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); + break; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: + GPR_TIMER_MARK("state=writing_stale_no_poller", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); + t->is_first_write_in_batch = false; + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // If the transport is closed, we will retry writing on the endpoint + // and next write may contain part of the currently serialized frames. + // So, we should only call the run_after_write callbacks when the next + // write finishes, or the callbacks will be invoked when the stream is + // closed. + if (!closed) { + GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + } + // TODO(yashykt) : When we were using combiners, we were using the + // finally version, so that the write action would happen after we were + // done queueing up all the writes that we wanted. Maybe do something + // similar? + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); + break; + } - grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); - gpr_mu_unlock(&t->mu); + grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } @@ -1384,7 +1387,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, static void complete_fetch(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; - gpr_mu_lock(&t->mu); + grpc_core::MutexLock lock(&t->mu); if (error == GRPC_ERROR_NONE) { error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { @@ -1396,7 +1399,6 @@ static void complete_fetch(void* gs, grpc_error* error) { s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } - gpr_mu_unlock(&t->mu); } static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, @@ -1438,8 +1440,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, gpr_free(str); } - gpr_mu_lock(&t->mu); - + grpc_core::MutexLock lock(&t->mu); GRPC_STATS_INC_HTTP2_OP_BATCHES(); s->context = op_payload->context; @@ -1719,7 +1720,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, "op->on_complete"); } - gpr_mu_unlock(&t->mu); } static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { @@ -1783,12 +1783,13 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { static void retry_initiate_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); - t->ping_state.is_delayed_ping_timer_set = false; - if (error == GRPC_ERROR_NONE) { - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + { + grpc_core::MutexLock lock(&t->mu); + t->ping_state.is_delayed_ping_timer_set = false; + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping"); } @@ -1845,8 +1846,7 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); gpr_free(msg); } - op->handler_private.extra_arg = gt; - gpr_mu_lock(&t->mu); + grpc_core::MutexLock lock(&t->mu); if (op->goaway_error) { send_goaway(t, op->goaway_error); } @@ -1881,7 +1881,6 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { } GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); - gpr_mu_unlock(&t->mu); } /******************************************************************************* @@ -2526,7 +2525,7 @@ static void read_action(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); GRPC_ERROR_REF(error); grpc_error* err = error; @@ -2610,9 +2609,9 @@ static void read_action(void* tp, grpc_error* error) { } else { continue_read_action_locked(t); } - gpr_mu_unlock(&t->mu); + lock.Unlock(); } else { - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } @@ -2652,13 +2651,13 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { static void finish_bdp_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2669,21 +2668,19 @@ static void finish_bdp_ping(void* tp, grpc_error* error) { t->have_next_bdp_ping_timer = true; grpc_timer_init(&t->next_bdp_ping_timer, next_ping, &t->next_bdp_ping_timer_expired); - gpr_mu_unlock(&t->mu); } static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } schedule_bdp_ping_locked(t); - gpr_mu_unlock(&t->mu); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, @@ -2755,32 +2752,34 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, static void init_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - } else if (error == GRPC_ERROR_NONE) { - if (t->keepalive_permit_without_calls || - grpc_chttp2_stream_map_size(&t->stream_map) > 0) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - grpc_timer_init_unset(&t->keepalive_watchdog_timer); - send_keepalive_ping_locked(t); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); - } else { + { + grpc_core::MutexLock lock(&t->mu); + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + } else if (error == GRPC_ERROR_NONE) { + if (t->keepalive_permit_without_calls || + grpc_chttp2_stream_map_size(&t->stream_map) > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + grpc_timer_init_unset(&t->keepalive_watchdog_timer); + send_keepalive_ping_locked(t); + grpc_chttp2_initiate_write(t, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping); + } + } else if (error == GRPC_ERROR_CANCELLED) { + /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping); } - } else if (error == GRPC_ERROR_CANCELLED) { - /* The keepalive ping timer may be cancelled by bdp */ - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } @@ -2805,47 +2804,49 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { static void finish_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); + { + grpc_core::MutexLock lock(&t->mu); + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); + } + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(&t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping); } - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - grpc_timer_cancel(&t->keepalive_watchdog_timer); - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } static void keepalive_watchdog_fired(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", - t->peer_string); - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked( - t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE)); - } - } else { - /* The watchdog timer should have been cancelled by - * finish_keepalive_ping. */ - if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { - gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", - t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + { + grpc_core::MutexLock lock(&t->mu); + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", + t->peer_string); + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked( + t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); + } + } else { + /* The watchdog timer should have been cancelled by + * finish_keepalive_ping. */ + if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); } @@ -2884,7 +2885,7 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast(arg); - gpr_mu_lock(&s->t->mu); + grpc_core::MutexLock lock(&s->t->mu); s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2898,7 +2899,6 @@ static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error)); s->byte_stream_error = GRPC_ERROR_REF(error); } - gpr_mu_unlock(&s->t->mu); } namespace grpc_core { @@ -2917,13 +2917,13 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - grpc_chttp2_transport* t = stream_->t; - gpr_mu_lock(&t->mu); + grpc_chttp2_stream* s = stream_; + grpc_chttp2_transport* t = s->t; + grpc_core::MutexLock lock(&t->mu); Unref(); - stream_->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(t, stream_); - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, stream_); - gpr_mu_unlock(&t->mu); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); } // TODO(yashykt) : Merge this with Next @@ -2931,7 +2931,7 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_error* error_ignored) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_transport* t = bs->transport_; - gpr_mu_lock(&t->mu); + grpc_core::MutexLock lock(&t->mu); grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { @@ -2970,7 +2970,6 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, s->on_next = bs->next_action_.on_complete; } bs->Unref(); - gpr_mu_unlock(&t->mu); } bool Chttp2IncomingByteStream::Next(size_t max_size_hint, @@ -3113,65 +3112,67 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { static void benign_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - if (error == GRPC_ERROR_NONE && - grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - /* Channel with no active streams: send a goaway to try and make it - * disconnect cleanly */ - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", - t->peer_string); + { + grpc_core::MutexLock lock(&t->mu); + if (error == GRPC_ERROR_NONE && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* Channel with no active streams: send a goaway to try and make it + * disconnect cleanly */ + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", + t->peer_string); + } + send_goaway( + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + } else if (error == GRPC_ERROR_NONE && + GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, + "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); + } + t->benign_reclaimer_registered = false; + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); } - send_goaway(t, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); - } else if (error == GRPC_ERROR_NONE && - GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, - "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR - " streams", - t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); - } - t->benign_reclaimer_registered = false; - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); - } - gpr_mu_unlock(&t->mu); + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } static void destructive_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - size_t n = grpc_chttp2_stream_map_size(&t->stream_map); - t->destructive_reclaimer_registered = false; - if (error == GRPC_ERROR_NONE && n > 0) { - grpc_chttp2_stream* s = static_cast( - grpc_chttp2_stream_map_rand(&t->stream_map)); - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, - s->id); - } - grpc_chttp2_cancel_stream( - t, s, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, - GRPC_HTTP2_ENHANCE_YOUR_CALM)); - if (n > 1) { - /* Since we cancel one stream per destructive reclamation, if - there are more streams left, we can immediately post a new - reclaimer in case the resource quota needs to free more - memory */ - post_destructive_reclaimer(t); - } - } - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); - } - gpr_mu_unlock(&t->mu); + { + grpc_core::MutexLock lock(&t->mu); + size_t n = grpc_chttp2_stream_map_size(&t->stream_map); + t->destructive_reclaimer_registered = false; + if (error == GRPC_ERROR_NONE && n > 0) { + grpc_chttp2_stream* s = static_cast( + grpc_chttp2_stream_map_rand(&t->stream_map)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } + grpc_chttp2_cancel_stream( + t, s, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + if (n > 1) { + /* Since we cancel one stream per destructive reclamation, if + there are more streams left, we can immediately post a new + reclaimer in case the resource quota needs to free more + memory */ + post_destructive_reclaimer(t); + } + } + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); + } + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index fe6a689ac29..fb33c841428 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1669,14 +1669,16 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { static void force_client_rst_stream(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; - gpr_mu_lock(&t->mu); - if (!s->write_closed) { - grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, - &s->stats.outgoing); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); - grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); + { + grpc_core::MutexLock lock(&t->mu); + if (!s->write_closed) { + grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, + &s->stats.outgoing); + grpc_chttp2_initiate_write(t, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); + grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); + } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index a57dadb84ad..ac05368632e 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -293,7 +293,7 @@ struct grpc_chttp2_transport { ~grpc_chttp2_transport(); grpc_transport base; /* must be first */ - gpr_mu mu; + grpc_core::Mutex mu; grpc_core::RefCount refs; grpc_endpoint* ep; char* peer_string; From 7d365e610f1605f691a9bff3b1a1582e1d5d96aa Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 24 Sep 2019 15:58:27 -0700 Subject: [PATCH 5/6] Remove destroy_stream_locked --- .../transport/chttp2/transport/chttp2_transport.cc | 11 +---------- src/core/ext/transport/chttp2/transport/internal.h | 1 - 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index fa79ee0c127..647442eb290 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -747,12 +747,6 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, return 0; } -static void destroy_stream_locked(void* sp, grpc_error* error) { - GPR_TIMER_SCOPE("destroy_stream", 0); - grpc_chttp2_stream* s = static_cast(sp); - s->~grpc_chttp2_stream(); -} - static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { GPR_TIMER_SCOPE("destroy_stream", 0); @@ -771,10 +765,7 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, } s->destroy_stream_arg = then_schedule_closure; - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + s->~grpc_chttp2_stream(); } grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ac05368632e..314e5fdf650 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -522,7 +522,6 @@ struct grpc_chttp2_stream { explicit Reffer(grpc_chttp2_stream* s); } reffer; - grpc_closure destroy_stream; grpc_closure* destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; From f7cced1e349241251dad5b83195dccd8fa0f1028 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 27 Sep 2019 11:37:02 -0700 Subject: [PATCH 6/6] Add null check for executor --- src/core/lib/iomgr/executor.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 387b29b89c0..6846ca0e643 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -467,6 +467,10 @@ void Executor::ShutdownAll() { bool Executor::IsThreaded(ExecutorType executor_type) { GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); + Executor* executor = executors[static_cast(executor_type)]; + if (executor == nullptr) { + return false; + } return executors[static_cast(executor_type)]->IsThreaded(); }