diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index b3372d7c31f..e333b42ad66 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -142,6 +142,11 @@ typedef struct { /** How much memory to use for hpack encoding. Int valued, bytes. */ #define GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER \ "grpc.http2.hpack_table_size.encoder" +/** How big a frame are we willing to receive via HTTP2. + Min 16384, max 16777215. + Larger values give lower CPU usage for large messages, but more head of line + blocking for small messages. */ +#define GRPC_ARG_HTTP2_MAX_FRAME_SIZE "grpc.http2.max_frame_size" /** Default authority to pass if none specified on call construction. A string. * */ #define GRPC_ARG_DEFAULT_AUTHORITY "grpc.default_authority" diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0e8dfb7d968..00999e3b940 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -408,6 +408,19 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, (uint32_t)channel_args->args[i].value.integer); } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_FRAME_SIZE)) { + if (channel_args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s: must be an integer", + GRPC_ARG_HTTP2_MAX_FRAME_SIZE); + } else if (channel_args->args[i].value.integer < 16384 || + channel_args->args[i].value.integer > 16777215) { + gpr_log(GPR_ERROR, "%s: must be between 16384 and 16777215", + GRPC_ARG_HTTP2_MAX_FRAME_SIZE); + } else { + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + (uint32_t)channel_args->args[i].value.integer); + } } } } diff --git a/src/core/ext/transport/chttp2/transport/frame.h b/src/core/ext/transport/chttp2/transport/frame.h index 7776609367b..507aae41003 100644 --- a/src/core/ext/transport/chttp2/transport/frame.h +++ b/src/core/ext/transport/chttp2/transport/frame.h @@ -52,8 +52,6 @@ typedef struct grpc_chttp2_transport_parsing grpc_chttp2_transport_parsing; #define GRPC_CHTTP2_FRAME_GOAWAY 7 #define GRPC_CHTTP2_FRAME_WINDOW_UPDATE 8 -#define GRPC_CHTTP2_MAX_PAYLOAD_LENGTH ((1 << 14) - 1) - #define GRPC_CHTTP2_DATA_FLAG_END_STREAM 1 #define GRPC_CHTTP2_FLAG_ACK 1 #define GRPC_CHTTP2_DATA_FLAG_END_HEADERS 4 diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 2cb8205d942..581471ba02d 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -78,6 +78,8 @@ typedef struct { uint32_t stream_id; gpr_slice_buffer *output; grpc_transport_one_way_stats *stats; + /* maximum size of a frame */ + size_t max_frame_size; } framer_state; /* fills p (which is expected to be 9 bytes long) with a data frame header */ @@ -123,7 +125,7 @@ static void begin_frame(framer_state *st) { needed */ static void ensure_space(framer_state *st, size_t need_bytes) { if (st->output->length - st->output_length_at_start_of_frame + need_bytes <= - GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { + st->max_frame_size) { return; } finish_frame(st, 0, 0); @@ -149,8 +151,8 @@ static void add_header_data(framer_state *st, gpr_slice slice) { size_t len = GPR_SLICE_LENGTH(slice); size_t remaining; if (len == 0) return; - remaining = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + - st->output_length_at_start_of_frame - st->output->length; + remaining = st->max_frame_size + st->output_length_at_start_of_frame - + st->output->length; if (len <= remaining) { st->stats->header_bytes += len; gpr_slice_buffer_add(st->output, slice); @@ -542,6 +544,7 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t stream_id, grpc_metadata_batch *metadata, int is_eof, + size_t max_frame_size, grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf) { framer_state st; @@ -555,6 +558,7 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, st.output = outbuf; st.is_first_frame = 1; st.stats = stats; + st.max_frame_size = max_frame_size; /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h index 0f7b0b063ab..4c3a9315495 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h @@ -91,6 +91,7 @@ void grpc_chttp2_hpack_compressor_set_max_usable_size( void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t id, grpc_metadata_batch *metadata, int is_eof, + size_t max_frame_size, grpc_transport_one_way_stats *stats, gpr_slice_buffer *outbuf); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index e1dcf5262a1..d67c014e548 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -223,6 +223,8 @@ typedef struct { uint8_t is_client; /** callback for when writing is done */ grpc_closure done_cb; + /** maximum frame size */ + uint32_t max_frame_size; } grpc_chttp2_transport_writing; struct grpc_chttp2_transport_parsing { diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 8f7a1f55c6e..311b26e354a 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -51,6 +51,10 @@ int grpc_chttp2_unlocking_check_writes( GPR_TIMER_BEGIN("grpc_chttp2_unlocking_check_writes", 0); + transport_writing->max_frame_size = + transport_global->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; + /* simple writes are queued to qbuf, and flushed here */ gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); GPR_ASSERT(transport_global->qbuf.count == 0); @@ -206,14 +210,15 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { uint32_t max_outgoing = - (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH, + (uint32_t)GPR_MIN(transport_writing->max_frame_size, GPR_MIN(stream_writing->outgoing_window, transport_writing->outgoing_window)); /* send initial metadata if it's available */ if (stream_writing->send_initial_metadata != NULL) { grpc_chttp2_encode_header( &transport_writing->hpack_compressor, stream_writing->id, - stream_writing->send_initial_metadata, 0, &stream_writing->stats, + stream_writing->send_initial_metadata, 0, + transport_writing->max_frame_size, &stream_writing->stats, &transport_writing->outbuf); stream_writing->send_initial_metadata = NULL; stream_writing->sent_initial_metadata = 1; @@ -303,7 +308,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, } else { grpc_chttp2_encode_header( &transport_writing->hpack_compressor, stream_writing->id, - stream_writing->send_trailing_metadata, 1, &stream_writing->stats, + stream_writing->send_trailing_metadata, 1, + transport_writing->max_frame_size, &stream_writing->stats, &transport_writing->outbuf); } if (!transport_writing->is_client && !stream_writing->read_closed) { diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index a62fb019304..1df237cb6c4 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include "test/core/end2end/cq_verifier.h" @@ -99,9 +100,25 @@ static gpr_slice large_slice(void) { return slice; } -static void test_invoke_large_request(grpc_end2end_test_config config) { +static void test_invoke_large_request(grpc_end2end_test_config config, + int max_frame_size, int lookahead_bytes) { + char *name; + gpr_asprintf(&name, + "test_invoke_large_request:max_frame_size=%d:lookahead_bytes=%d", + max_frame_size, lookahead_bytes); + + grpc_arg args[2]; + args[0].type = GRPC_ARG_INTEGER; + args[0].key = GRPC_ARG_HTTP2_MAX_FRAME_SIZE; + args[0].value.integer = max_frame_size; + args[1].type = GRPC_ARG_INTEGER; + args[1].key = GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES; + args[1].value.integer = lookahead_bytes; + grpc_channel_args channel_args = {GPR_ARRAY_SIZE(args), args}; + grpc_end2end_test_fixture f = - begin_test(config, "test_invoke_large_request", NULL, NULL); + begin_test(config, name, &channel_args, &channel_args); + gpr_free(name); gpr_slice request_payload_slice = large_slice(); gpr_slice response_payload_slice = large_slice(); @@ -253,7 +270,26 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { } void invoke_large_request(grpc_end2end_test_config config) { - test_invoke_large_request(config); + test_invoke_large_request(config, 16384, 65536); + test_invoke_large_request(config, 32768, 65536); + + test_invoke_large_request(config, 1000000 - 1, 65536); + test_invoke_large_request(config, 1000000, 65536); + test_invoke_large_request(config, 1000000 + 1, 65536); + test_invoke_large_request(config, 1000000 + 2, 65536); + test_invoke_large_request(config, 1000000 + 3, 65536); + test_invoke_large_request(config, 1000000 + 4, 65536); + test_invoke_large_request(config, 1000000 + 5, 65536); + test_invoke_large_request(config, 1000000 + 6, 65536); + + test_invoke_large_request(config, 1000000 - 1, 2000000); + test_invoke_large_request(config, 1000000, 2000000); + test_invoke_large_request(config, 1000000 + 1, 2000000); + test_invoke_large_request(config, 1000000 + 2, 2000000); + test_invoke_large_request(config, 1000000 + 3, 2000000); + test_invoke_large_request(config, 1000000 + 4, 2000000); + test_invoke_large_request(config, 1000000 + 5, 2000000); + test_invoke_large_request(config, 1000000 + 6, 2000000); } void invoke_large_request_pre_init(void) {} diff --git a/test/core/transport/chttp2/hpack_encoder_test.c b/test/core/transport/chttp2/hpack_encoder_test.c index 186bb6406fd..1c1c74879be 100644 --- a/test/core/transport/chttp2/hpack_encoder_test.c +++ b/test/core/transport/chttp2/hpack_encoder_test.c @@ -97,7 +97,7 @@ static void verify(size_t window_available, int eof, size_t expect_window_used, grpc_transport_one_way_stats stats; memset(&stats, 0, sizeof(stats)); - grpc_chttp2_encode_header(&g_compressor, 0xdeadbeef, &b, eof, &stats, + grpc_chttp2_encode_header(&g_compressor, 0xdeadbeef, &b, eof, 16384, &stats, &output); merged = grpc_slice_merge(output.slices, output.count); gpr_slice_buffer_destroy(&output); @@ -202,7 +202,8 @@ static void verify_table_size_change_match_elem_size(const char *key, grpc_transport_one_way_stats stats; memset(&stats, 0, sizeof(stats)); - grpc_chttp2_encode_header(&g_compressor, 0xdeadbeef, &b, 0, &stats, &output); + grpc_chttp2_encode_header(&g_compressor, 0xdeadbeef, &b, 0, 16384, &stats, + &output); gpr_slice_buffer_destroy(&output); grpc_metadata_batch_destroy(&b);