diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 16146569886..8e81e0a5c2b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -74,6 +74,8 @@ #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2 #define DEFAULT_MAX_PING_STRIKES 2 +#define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000 + static int g_default_client_keepalive_time_ms = DEFAULT_CLIENT_KEEPALIVE_TIME_MS; static int g_default_client_keepalive_timeout_ms = @@ -105,6 +107,7 @@ static void write_action(void* t, grpc_error* error); static void write_action_end_locked(void* t, grpc_error* error); static void read_action_locked(void* t, grpc_error* error); +static void continue_read_action_locked(grpc_chttp2_transport* t); static void complete_fetch_locked(void* gs, grpc_error* error); /** Set a transport level setting, and push it to our peer */ @@ -797,10 +800,8 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, !grpc_resource_user_safe_alloc(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE)) { gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream."); - grpc_slice_buffer_add( - &t->qbuf, - grpc_chttp2_rst_stream_create( - id, static_cast(GRPC_HTTP2_REFUSED_STREAM), nullptr)); + grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM, + nullptr); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); return nullptr; } @@ -1045,6 +1046,19 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) { GRPC_CLOSURE_SCHED( GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler), GRPC_ERROR_NONE); + if (t->reading_paused_on_pending_induced_frames) { + GPR_ASSERT(t->num_pending_induced_frames == 0); + /* We had paused reading, because we had many induced frames (SETTINGS + * ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have + * been able to flush qbuf, we can resume reading. */ + GRPC_CHTTP2_IF_TRACING(gpr_log( + GPR_INFO, + "transport %p : Resuming reading after being paused due to too " + "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames", + t)); + t->reading_paused_on_pending_induced_frames = false; + continue_read_action_locked(t); + } } else { GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); @@ -1114,7 +1128,6 @@ static void write_action_end_locked(void* tp, grpc_error* error) { } grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } @@ -2113,10 +2126,8 @@ void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_http2_error_code http_error; grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr, &http_error, nullptr); - grpc_slice_buffer_add( - &t->qbuf, - grpc_chttp2_rst_stream_create( - s->id, static_cast(http_error), &s->stats.outgoing)); + grpc_chttp2_add_rst_stream_to_next_write( + t, s->id, static_cast(http_error), &s->stats.outgoing); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); } } @@ -2427,9 +2438,8 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, grpc_slice_buffer_add(&t->qbuf, status_hdr); grpc_slice_buffer_add(&t->qbuf, message_pfx); grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice)); - grpc_slice_buffer_add( - &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, - &s->stats.outgoing)); + grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, + &s->stats.outgoing); grpc_chttp2_mark_stream_closed(t, s, 1, 1, error); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); @@ -2600,10 +2610,16 @@ static void read_action_locked(void* tp, grpc_error* error) { grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer); if (keep_reading) { - const bool urgent = t->goaway_error != GRPC_ERROR_NONE; - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); - grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, - nullptr); + if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) { + t->reading_paused_on_pending_induced_frames = true; + GRPC_CHTTP2_IF_TRACING( + gpr_log(GPR_INFO, + "transport %p : Pausing reading due to too " + "many unwritten SETTINGS ACK and RST_STREAM frames", + t)); + } else { + continue_read_action_locked(t); + } GRPC_CHTTP2_UNREF_TRANSPORT(t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); @@ -2612,6 +2628,12 @@ static void read_action_locked(void* tp, grpc_error* error) { GRPC_ERROR_UNREF(error); } +static void continue_read_action_locked(grpc_chttp2_transport* t) { + const bool urgent = t->goaway_error != GRPC_ERROR_NONE; + grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent); + grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr); +} + // t is reffed prior to calling the first time, and once the callback chain // that kicks off finishes, it's unreffed static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.cc b/src/core/ext/transport/chttp2/transport/frame_ping.cc index 9a56bf093f4..87c92dffc38 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.cc +++ b/src/core/ext/transport/chttp2/transport/frame_ping.cc @@ -118,6 +118,7 @@ grpc_error* grpc_chttp2_ping_parser_parse(void* parser, t->ping_acks = static_cast(gpr_realloc( t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks))); } + t->num_pending_induced_frames++; t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes; grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE); } diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc index cda09c3dea1..1350a967b9f 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.cc @@ -58,6 +58,14 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, return slice; } +void grpc_chttp2_add_rst_stream_to_next_write( + grpc_chttp2_transport* t, uint32_t id, uint32_t code, + grpc_transport_one_way_stats* stats) { + t->num_pending_induced_frames++; + grpc_slice_buffer_add(&t->qbuf, + grpc_chttp2_rst_stream_create(id, code, stats)); +} + grpc_error* grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser* parser, uint32_t length, uint8_t flags) { if (length != 4) { diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h index 64707666181..d61e62394a4 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h @@ -33,6 +33,13 @@ typedef struct { grpc_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code, grpc_transport_one_way_stats* stats); +// Adds RST_STREAM frame to t->qbuf (buffer for the next write). Should be +// called when we want to add RST_STREAM and we are not in +// write_action_begin_locked. +void grpc_chttp2_add_rst_stream_to_next_write( + grpc_chttp2_transport* t, uint32_t id, uint32_t code, + grpc_transport_one_way_stats* stats); + grpc_error* grpc_chttp2_rst_stream_parser_begin_frame( grpc_chttp2_rst_stream_parser* parser, uint32_t length, uint8_t flags); grpc_error* grpc_chttp2_rst_stream_parser_parse(void* parser, diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index 3f84679ec31..ba57afa74b9 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -132,6 +132,7 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t, if (is_last) { memcpy(parser->target_settings, parser->incoming_settings, GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); + t->num_pending_induced_frames++; grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); if (t->notify_on_receive_settings != nullptr) { GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index 724c6c1bffc..22c0962cd49 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1668,9 +1668,8 @@ static void force_client_rst_stream(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; if (!s->write_closed) { - grpc_slice_buffer_add( - &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, - &s->stats.outgoing)); + grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, + &s->stats.outgoing); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 643cd25b8bd..e6fc3b599e0 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -493,6 +493,13 @@ struct grpc_chttp2_transport { grpc_core::ContextList* cl = nullptr; grpc_core::RefCountedPtr channelz_socket; uint32_t num_messages_in_next_write = 0; + /** The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and + * RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond + * DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would + * only continue reading when we are able to write to the socket again, + * thereby reducing the number of induced frames. */ + uint32_t num_pending_induced_frames = 0; + bool reading_paused_on_pending_induced_frames = false; }; typedef enum { diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 4e6ff60caf8..119a5941d38 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -382,10 +382,9 @@ error_handler: if (s != nullptr) { grpc_chttp2_mark_stream_closed(t, s, true, false, err); } - grpc_slice_buffer_add( - &t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id, - GRPC_HTTP2_PROTOCOL_ERROR, - &s->stats.outgoing)); + grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id, + GRPC_HTTP2_PROTOCOL_ERROR, + &s->stats.outgoing); return init_skip_frame_parser(t, 0); } else { return err; @@ -765,10 +764,9 @@ static grpc_error* parse_frame_slice(grpc_chttp2_transport* t, grpc_chttp2_parsing_become_skip_parser(t); if (s) { s->forced_close_error = err; - grpc_slice_buffer_add( - &t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id, - GRPC_HTTP2_PROTOCOL_ERROR, - &s->stats.outgoing)); + grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id, + GRPC_HTTP2_PROTOCOL_ERROR, + &s->stats.outgoing); } else { GRPC_ERROR_UNREF(err); } diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index f3cb390dc7a..d6d9e4521f6 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -219,6 +219,7 @@ class WriteContext { void FlushQueuedBuffers() { /* simple writes are queued to qbuf, and flushed here */ grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf); + t_->num_pending_induced_frames = 0; GPR_ASSERT(t_->qbuf.count == 0); }