Merge pull request #19924 from yashykt/http2sec

Prevent HTTP2 parser from queueing a lot of induced frames
pull/19936/head
Yash Tibrewal 5 years ago committed by GitHub
commit f195ee0f01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 1
      src/core/ext/transport/chttp2/transport/frame_ping.cc
  3. 8
      src/core/ext/transport/chttp2/transport/frame_rst_stream.cc
  4. 7
      src/core/ext/transport/chttp2/transport/frame_rst_stream.h
  5. 1
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  6. 5
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  7. 7
      src/core/ext/transport/chttp2/transport/internal.h
  8. 14
      src/core/ext/transport/chttp2/transport/parsing.cc
  9. 1
      src/core/ext/transport/chttp2/transport/writing.cc

@ -74,6 +74,8 @@
#define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
#define DEFAULT_MAX_PING_STRIKES 2
#define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
static int g_default_client_keepalive_time_ms =
DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
static int g_default_client_keepalive_timeout_ms =
@ -105,6 +107,7 @@ static void write_action(void* t, grpc_error* error);
static void write_action_end_locked(void* t, grpc_error* error);
static void read_action_locked(void* t, grpc_error* error);
static void continue_read_action_locked(grpc_chttp2_transport* t);
static void complete_fetch_locked(void* gs, grpc_error* error);
/** Set a transport level setting, and push it to our peer */
@ -797,10 +800,8 @@ grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
!grpc_resource_user_safe_alloc(t->resource_user,
GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
grpc_slice_buffer_add(
&t->qbuf,
grpc_chttp2_rst_stream_create(
id, static_cast<uint32_t>(GRPC_HTTP2_REFUSED_STREAM), nullptr));
grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
nullptr);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
return nullptr;
}
@ -1045,6 +1046,19 @@ static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
GRPC_ERROR_NONE);
if (t->reading_paused_on_pending_induced_frames) {
GPR_ASSERT(t->num_pending_induced_frames == 0);
/* We had paused reading, because we had many induced frames (SETTINGS
* ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
* been able to flush qbuf, we can resume reading. */
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_INFO,
"transport %p : Resuming reading after being paused due to too "
"many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
t));
t->reading_paused_on_pending_induced_frames = false;
continue_read_action_locked(t);
}
} else {
GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
@ -1114,7 +1128,6 @@ static void write_action_end_locked(void* tp, grpc_error* error) {
}
grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
}
@ -1160,7 +1173,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
goaway_error, grpc_error_string(t->goaway_error));
}
/* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
* data equal to "too_many_pings", it should log the occurrence at a log level
* that is enabled by default and double the configured KEEPALIVE_TIME used
@ -2110,10 +2122,8 @@ void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_http2_error_code http_error;
grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
&http_error, nullptr);
grpc_slice_buffer_add(
&t->qbuf,
grpc_chttp2_rst_stream_create(
s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing));
grpc_chttp2_add_rst_stream_to_next_write(
t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
}
}
@ -2424,9 +2434,8 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_slice_buffer_add(&t->qbuf, status_hdr);
grpc_slice_buffer_add(&t->qbuf, message_pfx);
grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing);
grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
@ -2596,10 +2605,16 @@ static void read_action_locked(void* tp, grpc_error* error) {
grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
if (keep_reading) {
const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t,
nullptr);
if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
t->reading_paused_on_pending_induced_frames = true;
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO,
"transport %p : Pausing reading due to too "
"many unwritten SETTINGS ACK and RST_STREAM frames",
t));
} else {
continue_read_action_locked(t);
}
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
}
@ -2607,6 +2622,12 @@ static void read_action_locked(void* tp, grpc_error* error) {
GRPC_ERROR_UNREF(error);
}
static void continue_read_action_locked(grpc_chttp2_transport* t) {
const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
}
// t is reffed prior to calling the first time, and once the callback chain
// that kicks off finishes, it's unreffed
static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {

@ -118,6 +118,7 @@ grpc_error* grpc_chttp2_ping_parser_parse(void* parser,
t->ping_acks = static_cast<uint64_t*>(gpr_realloc(
t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks)));
}
t->num_pending_induced_frames++;
t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE);
}

@ -58,6 +58,14 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
return slice;
}
void grpc_chttp2_add_rst_stream_to_next_write(
grpc_chttp2_transport* t, uint32_t id, uint32_t code,
grpc_transport_one_way_stats* stats) {
t->num_pending_induced_frames++;
grpc_slice_buffer_add(&t->qbuf,
grpc_chttp2_rst_stream_create(id, code, stats));
}
grpc_error* grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser* parser, uint32_t length, uint8_t flags) {
if (length != 4) {

@ -33,6 +33,13 @@ typedef struct {
grpc_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code,
grpc_transport_one_way_stats* stats);
// Adds RST_STREAM frame to t->qbuf (buffer for the next write). Should be
// called when we want to add RST_STREAM and we are not in
// write_action_begin_locked.
void grpc_chttp2_add_rst_stream_to_next_write(
grpc_chttp2_transport* t, uint32_t id, uint32_t code,
grpc_transport_one_way_stats* stats);
grpc_error* grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser* parser, uint32_t length, uint8_t flags);
grpc_error* grpc_chttp2_rst_stream_parser_parse(void* parser,

@ -132,6 +132,7 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t,
if (is_last) {
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
t->num_pending_induced_frames++;
grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
if (t->notify_on_receive_settings != nullptr) {
GRPC_CLOSURE_SCHED(t->notify_on_receive_settings,

@ -1671,9 +1671,8 @@ static void force_client_rst_stream(void* sp, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
grpc_chttp2_transport* t = s->t;
if (!s->write_closed) {
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(t, s, true, true, GRPC_ERROR_NONE);
}

@ -493,6 +493,13 @@ struct grpc_chttp2_transport {
grpc_core::ContextList* cl = nullptr;
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
uint32_t num_messages_in_next_write = 0;
/** The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
* RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
* DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
* only continue reading when we are able to write to the socket again,
* thereby reducing the number of induced frames. */
uint32_t num_pending_induced_frames = 0;
bool reading_paused_on_pending_induced_frames = false;
};
typedef enum {

@ -382,10 +382,9 @@ error_handler:
if (s != nullptr) {
grpc_chttp2_mark_stream_closed(t, s, true, false, err);
}
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id,
GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing));
grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id,
GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing);
return init_skip_frame_parser(t, 0);
} else {
return err;
@ -763,10 +762,9 @@ static grpc_error* parse_frame_slice(grpc_chttp2_transport* t,
grpc_chttp2_parsing_become_skip_parser(t);
if (s) {
s->forced_close_error = err;
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id,
GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing));
grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id,
GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing);
} else {
GRPC_ERROR_UNREF(err);
}

@ -219,6 +219,7 @@ class WriteContext {
void FlushQueuedBuffers() {
/* simple writes are queued to qbuf, and flushed here */
grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
t_->num_pending_induced_frames = 0;
GPR_ASSERT(t_->qbuf.count == 0);
}

Loading…
Cancel
Save