diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index b73c1cfa2d7..fa79ee0c127 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -212,7 +212,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() { GRPC_ERROR_UNREF(closed_with_error); gpr_free(ping_acks); gpr_free(peer_string); - gpr_mu_destroy(&mu); } static const grpc_transport_vtable* get_vtable(void); @@ -473,7 +472,6 @@ grpc_chttp2_transport::grpc_chttp2_transport( GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); base.vtable = get_vtable(); - gpr_mu_init(&mu); /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's @@ -554,13 +552,14 @@ grpc_chttp2_transport::grpc_chttp2_transport( static void destroy_transport(grpc_transport* gt) { grpc_chttp2_transport* t = reinterpret_cast(gt); - gpr_mu_lock(&t->mu); - t->destroying = 1; - close_transport_locked( - t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), - GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); - gpr_mu_unlock(&t->mu); + { + grpc_core::MutexLock lock(&t->mu); + t->destroying = 1; + close_transport_locked( + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + } // Must be the last line. GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } @@ -681,58 +680,60 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, } grpc_chttp2_stream::~grpc_chttp2_stream() { - gpr_mu_lock(&t->mu); - if (t->channelz_socket != nullptr) { - if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { - t->channelz_socket->RecordStreamSucceeded(); - } else { - t->channelz_socket->RecordStreamFailed(); + { + grpc_core::MutexLock lock(&t->mu); + if (t->channelz_socket != nullptr) { + if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); + } } - } - GPR_ASSERT((write_closed && read_closed) || id == 0); - if (id != 0) { - GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); - } + GPR_ASSERT((write_closed && read_closed) || id == 0); + if (id != 0) { + GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); + } - grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(&frame_storage); - if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { - grpc_slice_buffer_destroy_internal(&compressed_data_buffer); - } - if (stream_decompression_method != - GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { - grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); - } + grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(&frame_storage); + if (stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + } + if (stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + } - grpc_chttp2_list_remove_stalled_by_transport(t, this); - grpc_chttp2_list_remove_stalled_by_stream(t, this); + grpc_chttp2_list_remove_stalled_by_transport(t, this); + grpc_chttp2_list_remove_stalled_by_stream(t, this); - for (int i = 0; i < STREAM_LIST_COUNT; i++) { - if (GPR_UNLIKELY(included[i])) { - gpr_log(GPR_ERROR, "%s stream %d still included in list %d", - t->is_client ? "client" : "server", id, i); - abort(); + for (int i = 0; i < STREAM_LIST_COUNT; i++) { + if (GPR_UNLIKELY(included[i])) { + gpr_log(GPR_ERROR, "%s stream %d still included in list %d", + t->is_client ? "client" : "server", id, i); + abort(); + } } - } - GPR_ASSERT(send_initial_metadata_finished == nullptr); - GPR_ASSERT(fetching_send_message == nullptr); - GPR_ASSERT(send_trailing_metadata_finished == nullptr); - GPR_ASSERT(recv_initial_metadata_ready == nullptr); - GPR_ASSERT(recv_message_ready == nullptr); - GPR_ASSERT(recv_trailing_metadata_finished == nullptr); - grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); - GRPC_ERROR_UNREF(read_closed_error); - GRPC_ERROR_UNREF(write_closed_error); - GRPC_ERROR_UNREF(byte_stream_error); + GPR_ASSERT(send_initial_metadata_finished == nullptr); + GPR_ASSERT(fetching_send_message == nullptr); + GPR_ASSERT(send_trailing_metadata_finished == nullptr); + GPR_ASSERT(recv_initial_metadata_ready == nullptr); + GPR_ASSERT(recv_message_ready == nullptr); + GPR_ASSERT(recv_trailing_metadata_finished == nullptr); + grpc_slice_buffer_destroy_internal(&flow_controlled_buffer); + GRPC_ERROR_UNREF(read_closed_error); + GRPC_ERROR_UNREF(write_closed_error); + GRPC_ERROR_UNREF(byte_stream_error); - flow_control.Destroy(); + flow_control.Destroy(); - if (t->resource_user != nullptr) { - grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + if (t->resource_user != nullptr) { + grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE); + } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); } @@ -1010,7 +1011,7 @@ static const char* begin_writing_desc(bool partial, bool inlined) { static void write_action_begin(void* gt, grpc_error* error_ignored) { GPR_TIMER_SCOPE("write_action_begin", 0); grpc_chttp2_transport* t = static_cast(gt); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (t->closed_with_error != GRPC_ERROR_NONE) { @@ -1051,11 +1052,11 @@ static void write_action_begin(void* gt, grpc_error* error_ignored) { t->reading_paused_on_pending_induced_frames = false; continue_read_action_locked(t); } - gpr_mu_unlock(&t->mu); + lock.Unlock(); } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } } @@ -1076,54 +1077,56 @@ static void write_action(void* gt, grpc_error* error) { static void write_action_end(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("terminate_writing_with_lock", 0); grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); - bool closed = false; - if (error != GRPC_ERROR_NONE) { - close_transport_locked(t, GRPC_ERROR_REF(error)); - closed = true; - } - - if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { - t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; - closed = true; - if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - close_transport_locked( - t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); + { + grpc_core::MutexLock lock(&t->mu); + bool closed = false; + if (error != GRPC_ERROR_NONE) { + close_transport_locked(t, GRPC_ERROR_REF(error)); + closed = true; } - } - switch (t->write_state) { - case GRPC_CHTTP2_WRITE_STATE_IDLE: - GPR_UNREACHABLE_CODE(break); - case GRPC_CHTTP2_WRITE_STATE_WRITING: - GPR_TIMER_MARK("state=writing", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); - break; - case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: - GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); - t->is_first_write_in_batch = false; - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - // If the transport is closed, we will retry writing on the endpoint - // and next write may contain part of the currently serialized frames. - // So, we should only call the run_after_write callbacks when the next - // write finishes, or the callbacks will be invoked when the stream is - // closed. - if (!closed) { - GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + closed = true; + if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + close_transport_locked( + t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); } - // TODO(yashykt) : When we were using combiners, we were using the finally - // version, so that the write action would happen after we were done - // queueing up all the writes that we wanted. Maybe do something similar? - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); - break; - } + } + + switch (t->write_state) { + case GRPC_CHTTP2_WRITE_STATE_IDLE: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITE_STATE_WRITING: + GPR_TIMER_MARK("state=writing", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); + break; + case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: + GPR_TIMER_MARK("state=writing_stale_no_poller", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); + t->is_first_write_in_batch = false; + GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + // If the transport is closed, we will retry writing on the endpoint + // and next write may contain part of the currently serialized frames. + // So, we should only call the run_after_write callbacks when the next + // write finishes, or the callbacks will be invoked when the stream is + // closed. + if (!closed) { + GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); + } + // TODO(yashykt) : When we were using combiners, we were using the + // finally version, so that the write action would happen after we were + // done queueing up all the writes that we wanted. Maybe do something + // similar? + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&t->write_action_begin, write_action_begin, t, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); + break; + } - grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); - gpr_mu_unlock(&t->mu); + grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } @@ -1384,7 +1387,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, static void complete_fetch(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; - gpr_mu_lock(&t->mu); + grpc_core::MutexLock lock(&t->mu); if (error == GRPC_ERROR_NONE) { error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { @@ -1396,7 +1399,6 @@ static void complete_fetch(void* gs, grpc_error* error) { s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } - gpr_mu_unlock(&t->mu); } static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, @@ -1438,8 +1440,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, gpr_free(str); } - gpr_mu_lock(&t->mu); - + grpc_core::MutexLock lock(&t->mu); GRPC_STATS_INC_HTTP2_OP_BATCHES(); s->context = op_payload->context; @@ -1719,7 +1720,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, "op->on_complete"); } - gpr_mu_unlock(&t->mu); } static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { @@ -1783,12 +1783,13 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { static void retry_initiate_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); - t->ping_state.is_delayed_ping_timer_set = false; - if (error == GRPC_ERROR_NONE) { - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + { + grpc_core::MutexLock lock(&t->mu); + t->ping_state.is_delayed_ping_timer_set = false; + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping"); } @@ -1845,8 +1846,7 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg); gpr_free(msg); } - op->handler_private.extra_arg = gt; - gpr_mu_lock(&t->mu); + grpc_core::MutexLock lock(&t->mu); if (op->goaway_error) { send_goaway(t, op->goaway_error); } @@ -1881,7 +1881,6 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { } GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); - gpr_mu_unlock(&t->mu); } /******************************************************************************* @@ -2526,7 +2525,7 @@ static void read_action(void* tp, grpc_error* error) { GPR_TIMER_SCOPE("reading_action_locked", 0); grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); GRPC_ERROR_REF(error); grpc_error* err = error; @@ -2610,9 +2609,9 @@ static void read_action(void* tp, grpc_error* error) { } else { continue_read_action_locked(t); } - gpr_mu_unlock(&t->mu); + lock.Unlock(); } else { - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } @@ -2652,13 +2651,13 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) { static void finish_bdp_ping(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) { - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } @@ -2669,21 +2668,19 @@ static void finish_bdp_ping(void* tp, grpc_error* error) { t->have_next_bdp_ping_timer = true; grpc_timer_init(&t->next_bdp_ping_timer, next_ping, &t->next_bdp_ping_timer_expired); - gpr_mu_unlock(&t->mu); } static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) { grpc_chttp2_transport* t = static_cast(tp); - gpr_mu_lock(&t->mu); + grpc_core::ReleasableMutexLock lock(&t->mu); GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { - gpr_mu_unlock(&t->mu); + lock.Unlock(); GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } schedule_bdp_ping_locked(t); - gpr_mu_unlock(&t->mu); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, @@ -2755,32 +2752,34 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, static void init_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - } else if (error == GRPC_ERROR_NONE) { - if (t->keepalive_permit_without_calls || - grpc_chttp2_stream_map_size(&t->stream_map) > 0) { - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - grpc_timer_init_unset(&t->keepalive_watchdog_timer); - send_keepalive_ping_locked(t); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); - } else { + { + grpc_core::MutexLock lock(&t->mu); + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + } else if (error == GRPC_ERROR_NONE) { + if (t->keepalive_permit_without_calls || + grpc_chttp2_stream_map_size(&t->stream_map) > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + grpc_timer_init_unset(&t->keepalive_watchdog_timer); + send_keepalive_ping_locked(t); + grpc_chttp2_initiate_write(t, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping); + } + } else if (error == GRPC_ERROR_CANCELLED) { + /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, &t->init_keepalive_ping); } - } else if (error == GRPC_ERROR_CANCELLED) { - /* The keepalive ping timer may be cancelled by bdp */ - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } @@ -2805,47 +2804,49 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { static void finish_keepalive_ping(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { - gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); + { + grpc_core::MutexLock lock(&t->mu); + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { + gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string); + } + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(&t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init(&t->keepalive_ping_timer, + grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, + &t->init_keepalive_ping); } - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - grpc_timer_cancel(&t->keepalive_watchdog_timer); - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(&t->keepalive_ping_timer, - grpc_core::ExecCtx::Get()->Now() + t->keepalive_time, - &t->init_keepalive_ping); } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } static void keepalive_watchdog_fired(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", - t->peer_string); - t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked( - t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout"), - GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE)); - } - } else { - /* The watchdog timer should have been cancelled by - * finish_keepalive_ping. */ - if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { - gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", - t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + { + grpc_core::MutexLock lock(&t->mu); + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "%s: Keepalive watchdog fired. Closing transport.", + t->peer_string); + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked( + t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); + } + } else { + /* The watchdog timer should have been cancelled by + * finish_keepalive_ping. */ + if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); } @@ -2884,7 +2885,7 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast(arg); - gpr_mu_lock(&s->t->mu); + grpc_core::MutexLock lock(&s->t->mu); s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2898,7 +2899,6 @@ static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error)); s->byte_stream_error = GRPC_ERROR_REF(error); } - gpr_mu_unlock(&s->t->mu); } namespace grpc_core { @@ -2917,13 +2917,13 @@ Chttp2IncomingByteStream::Chttp2IncomingByteStream( void Chttp2IncomingByteStream::Orphan() { GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - grpc_chttp2_transport* t = stream_->t; - gpr_mu_lock(&t->mu); + grpc_chttp2_stream* s = stream_; + grpc_chttp2_transport* t = s->t; + grpc_core::MutexLock lock(&t->mu); Unref(); - stream_->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(t, stream_); - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, stream_); - gpr_mu_unlock(&t->mu); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); } // TODO(yashykt) : Merge this with Next @@ -2931,7 +2931,7 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_error* error_ignored) { Chttp2IncomingByteStream* bs = static_cast(arg); grpc_chttp2_transport* t = bs->transport_; - gpr_mu_lock(&t->mu); + grpc_core::MutexLock lock(&t->mu); grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { @@ -2970,7 +2970,6 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, s->on_next = bs->next_action_.on_complete; } bs->Unref(); - gpr_mu_unlock(&t->mu); } bool Chttp2IncomingByteStream::Next(size_t max_size_hint, @@ -3113,65 +3112,67 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { static void benign_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - if (error == GRPC_ERROR_NONE && - grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - /* Channel with no active streams: send a goaway to try and make it - * disconnect cleanly */ - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", - t->peer_string); + { + grpc_core::MutexLock lock(&t->mu); + if (error == GRPC_ERROR_NONE && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + /* Channel with no active streams: send a goaway to try and make it + * disconnect cleanly */ + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", + t->peer_string); + } + send_goaway( + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + } else if (error == GRPC_ERROR_NONE && + GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, + "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); + } + t->benign_reclaimer_registered = false; + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); } - send_goaway(t, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); - } else if (error == GRPC_ERROR_NONE && - GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, - "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR - " streams", - t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); - } - t->benign_reclaimer_registered = false; - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); - } - gpr_mu_unlock(&t->mu); + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } static void destructive_reclaimer(void* arg, grpc_error* error) { grpc_chttp2_transport* t = static_cast(arg); - gpr_mu_lock(&t->mu); - size_t n = grpc_chttp2_stream_map_size(&t->stream_map); - t->destructive_reclaimer_registered = false; - if (error == GRPC_ERROR_NONE && n > 0) { - grpc_chttp2_stream* s = static_cast( - grpc_chttp2_stream_map_rand(&t->stream_map)); - if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { - gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, - s->id); - } - grpc_chttp2_cancel_stream( - t, s, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, - GRPC_HTTP2_ENHANCE_YOUR_CALM)); - if (n > 1) { - /* Since we cancel one stream per destructive reclamation, if - there are more streams left, we can immediately post a new - reclaimer in case the resource quota needs to free more - memory */ - post_destructive_reclaimer(t); - } - } - if (error != GRPC_ERROR_CANCELLED) { - grpc_resource_user_finish_reclamation( - grpc_endpoint_get_resource_user(t->ep)); - } - gpr_mu_unlock(&t->mu); + { + grpc_core::MutexLock lock(&t->mu); + size_t n = grpc_chttp2_stream_map_size(&t->stream_map); + t->destructive_reclaimer_registered = false; + if (error == GRPC_ERROR_NONE && n > 0) { + grpc_chttp2_stream* s = static_cast( + grpc_chttp2_stream_map_rand(&t->stream_map)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { + gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } + grpc_chttp2_cancel_stream( + t, s, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); + if (n > 1) { + /* Since we cancel one stream per destructive reclamation, if + there are more streams left, we can immediately post a new + reclaimer in case the resource quota needs to free more + memory */ + post_destructive_reclaimer(t); + } + } + if (error != GRPC_ERROR_CANCELLED) { + grpc_resource_user_finish_reclamation( + grpc_endpoint_get_resource_user(t->ep)); + } + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index fe6a689ac29..fb33c841428 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1669,14 +1669,16 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { static void force_client_rst_stream(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; - gpr_mu_lock(&t->mu); - if (!s->write_closed) { - grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, - &s->stats.outgoing); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); - grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); + { + grpc_core::MutexLock lock(&t->mu); + if (!s->write_closed) { + grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, + &s->stats.outgoing); + grpc_chttp2_initiate_write(t, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); + grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); + } } - gpr_mu_unlock(&t->mu); GRPC_CHTTP2_STREAM_UNREF(s, "final_rst"); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index a57dadb84ad..ac05368632e 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -293,7 +293,7 @@ struct grpc_chttp2_transport { ~grpc_chttp2_transport(); grpc_transport base; /* must be first */ - gpr_mu mu; + grpc_core::Mutex mu; grpc_core::RefCount refs; grpc_endpoint* ep; char* peer_string;