From 9469ac4a7a355aede1b386d52eeb937de8a6e456 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Jul 2017 12:50:35 -0700 Subject: [PATCH 1/2] transport - Move slice buffer creation to stream compression enabling time (PR 3) for performance --- .../chttp2/transport/chttp2_transport.c | 26 +++++++++++-------- .../ext/transport/chttp2/transport/internal.h | 4 +-- .../ext/transport/chttp2/transport/writing.c | 21 ++++++++------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 41516ed91a6..212704a1219 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -668,8 +668,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); - grpc_slice_buffer_init(&s->compressed_data_buffer); - grpc_slice_buffer_init(&s->decompressed_data_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_schedule_on_exec_ctx); @@ -708,8 +706,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); - grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer); + if (s->compressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer); + gpr_free(s->compressed_data_buffer); + } + if (s->decompressed_data_buffer) { + grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); + gpr_free(s->decompressed_data_buffer); + } grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -1671,7 +1675,7 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, } if (s->stream_compression_recv_enabled && !s->unprocessed_incoming_frames_decompressed) { - GPR_ASSERT(s->decompressed_data_buffer.length == 0); + GPR_ASSERT(s->decompressed_data_buffer->length == 0); bool end_of_context; if (!s->stream_decompression_ctx) { s->stream_decompression_ctx = @@ -1680,7 +1684,7 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, } if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer, NULL, + s->decompressed_data_buffer, NULL, GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -1691,8 +1695,8 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, "Stream decompression error."); } else { error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer, - NULL, s->recv_message); + exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL, + s->recv_message); if (end_of_context) { grpc_stream_compression_context_destroy( s->stream_decompression_ctx); @@ -2713,15 +2717,15 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, } if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer, NULL, - MAX_SIZE_T, &end_of_context)) { + s->decompressed_data_buffer, NULL, MAX_SIZE_T, + &end_of_context)) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); return error; } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer); + s->decompressed_data_buffer); s->unprocessed_incoming_frames_decompressed = true; if (end_of_context) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 9fad6835167..eb1acc0f132 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -533,12 +533,12 @@ struct grpc_chttp2_stream { grpc_stream_compression_context *stream_compression_ctx; /** Buffer storing data that is compressed but not sent */ - grpc_slice_buffer compressed_data_buffer; + grpc_slice_buffer *compressed_data_buffer; /** Amount of uncompressed bytes sent out when compressed_data_buffer is * emptied */ size_t uncompressed_data_size; /** Temporary buffer storing decompressed data */ - grpc_slice_buffer decompressed_data_buffer; + grpc_slice_buffer *decompressed_data_buffer; }; /** Transport writing call flow: diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 08babb73692..2e3e9ee28bf 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -304,7 +304,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ if (s->flow_controlled_buffer.length > 0 || - s->compressed_data_buffer.length > 0) { + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { uint32_t stream_outgoing_window = (uint32_t)GPR_MAX( 0, s->outgoing_window_delta + @@ -319,19 +320,19 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( bool is_last_frame = false; if (s->stream_compression_send_enabled) { while ((s->flow_controlled_buffer.length > 0 || - s->compressed_data_buffer.length > 0) && + s->compressed_data_buffer->length > 0) && max_outgoing > 0) { - if (s->compressed_data_buffer.length > 0) { + if (s->compressed_data_buffer->length > 0) { uint32_t send_bytes = (uint32_t)GPR_MIN( - max_outgoing, s->compressed_data_buffer.length); + max_outgoing, s->compressed_data_buffer->length); is_last_data_frame = - (send_bytes == s->compressed_data_buffer.length && + (send_bytes == s->compressed_data_buffer->length && s->flow_controlled_buffer.length == 0 && s->fetching_send_message == NULL); is_last_frame = is_last_data_frame && s->send_trailing_metadata != NULL && grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer, + grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, send_bytes, is_last_frame, &s->stats.outgoing, &t->outbuf); GRPC_CHTTP2_FLOW_DEBIT_STREAM( @@ -339,7 +340,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, send_bytes); max_outgoing -= send_bytes; - if (s->compressed_data_buffer.length == 0) { + if (s->compressed_data_buffer->length == 0) { s->sending_bytes += s->uncompressed_data_size; } } else { @@ -351,7 +352,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( s->uncompressed_data_size = s->flow_controlled_buffer.length; GPR_ASSERT(grpc_stream_compress( s->stream_compression_ctx, &s->flow_controlled_buffer, - &s->compressed_data_buffer, NULL, MAX_SIZE_T, + s->compressed_data_buffer, NULL, MAX_SIZE_T, GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); } } @@ -390,7 +391,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } now_writing = true; if (s->flow_controlled_buffer.length > 0 || - s->compressed_data_buffer.length > 0) { + s->compressed_data_buffer->length > 0) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -405,7 +406,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && s->flow_controlled_buffer.length == 0 && - s->compressed_data_buffer.length == 0) { + s->compressed_data_buffer->length == 0) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, From c7a94c5052a2409ac68b991ae116d5f9e60eb545 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Jul 2017 14:04:26 -0700 Subject: [PATCH 2/2] Bug fix --- src/core/ext/transport/chttp2/transport/writing.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 2e3e9ee28bf..c3ede08343a 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -391,7 +391,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } now_writing = true; if (s->flow_controlled_buffer.length > 0 || - s->compressed_data_buffer->length > 0) { + (s->stream_compression_send_enabled && + s->compressed_data_buffer->length > 0)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -406,7 +407,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && s->flow_controlled_buffer.length == 0 && - s->compressed_data_buffer->length == 0) { + (!s->stream_compression_send_enabled || + s->compressed_data_buffer->length == 0)) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,