From c3229b777ce58f49b092a92a07b18155d1ae7799 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 5 Sep 2017 17:57:19 -0700 Subject: [PATCH] Remove duplicate sentences on recv path --- .../chttp2/transport/chttp2_transport.c | 37 ++++++++----------- .../transport/chttp2/transport/hpack_parser.c | 13 ++----- .../ext/transport/chttp2/transport/internal.h | 9 ++--- .../compression/stream_compression_identity.c | 1 + 4 files changed, 23 insertions(+), 37 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 64a5e342feb..ad0521cc42f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -693,6 +693,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&s->frame_storage); + grpc_slice_buffer_init(&s->compressed_data_buffer); + grpc_slice_buffer_init(&s->decompressed_data_buffer); s->pending_byte_stream = false; GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s, grpc_combiner_scheduler(t->combiner)); @@ -728,10 +730,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer); - if (s->decompressed_data_buffer) { - grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); - gpr_free(s->decompressed_data_buffer); - } + grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer); grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -1306,7 +1305,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, true, &s->stream_compression_method) == 0) { s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; } - grpc_slice_buffer_init(&s->compressed_data_buffer); s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = @@ -1725,18 +1723,17 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, &s->frame_storage); s->unprocessed_incoming_frames_decompressed = false; } - if (s->stream_compression_recv_enabled && - !s->unprocessed_incoming_frames_decompressed) { - GPR_ASSERT(s->decompressed_data_buffer->length == 0); + if (!s->unprocessed_incoming_frames_decompressed) { + GPR_ASSERT(s->decompressed_data_buffer.length == 0); bool end_of_context; if (!s->stream_decompression_ctx) { s->stream_decompression_ctx = grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS); + s->stream_decompression_method); } if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->unprocessed_incoming_frames_buffer, - s->decompressed_data_buffer, NULL, + &s->decompressed_data_buffer, NULL, GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -1747,7 +1744,7 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, "Stream decompression error."); } else { error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL, + exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer, NULL, s->recv_message); if (end_of_context) { grpc_stream_compression_context_destroy( @@ -1755,10 +1752,6 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, s->stream_decompression_ctx = NULL; } } - } else { - error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, - &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); } if (error != GRPC_ERROR_NONE) { s->seen_error = true; @@ -1797,7 +1790,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } bool pending_data = s->pending_byte_stream || s->unprocessed_incoming_frames_buffer.length > 0; - if (s->stream_compression_recv_enabled && s->read_closed && + if (s->read_closed && s->frame_storage.length > 0 && !pending_data && !s->seen_error && s->recv_trailing_metadata_finished != NULL) { /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and @@ -1805,7 +1798,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, bool end_of_context; if (!s->stream_decompression_ctx) { s->stream_decompression_ctx = grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS); + s->stream_decompression_method); } if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->frame_storage, @@ -1818,6 +1811,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } else { if (s->unprocessed_incoming_frames_buffer.length > 0) { s->unprocessed_incoming_frames_decompressed = true; + pending_data = true; } if (end_of_context) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); @@ -2690,16 +2684,15 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_error *error; if (s->unprocessed_incoming_frames_buffer.length > 0) { - if (s->stream_compression_recv_enabled && - !s->unprocessed_incoming_frames_decompressed) { + if (!s->unprocessed_incoming_frames_decompressed) { bool end_of_context; if (!s->stream_decompression_ctx) { s->stream_decompression_ctx = grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_GZIP_DECOMPRESS); + s->stream_decompression_method); } if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->unprocessed_incoming_frames_buffer, - s->decompressed_data_buffer, NULL, MAX_SIZE_T, + &s->decompressed_data_buffer, NULL, MAX_SIZE_T, &end_of_context)) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); @@ -2707,7 +2700,7 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, - s->decompressed_data_buffer); + &s->decompressed_data_buffer); s->unprocessed_incoming_frames_decompressed = true; if (end_of_context) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index c21d76ba710..3b338da288d 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1659,16 +1659,9 @@ static void parse_stream_compression_md(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_metadata_batch *initial_metadata) { - if (initial_metadata->idx.named.content_encoding != NULL) { - grpc_slice content_encoding = - GRPC_MDVALUE(initial_metadata->idx.named.content_encoding->md); - if (!grpc_slice_eq(content_encoding, GRPC_MDSTR_IDENTITY)) { - if (grpc_slice_eq(content_encoding, GRPC_MDSTR_GZIP)) { - s->stream_compression_recv_enabled = true; - s->decompressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer)); - grpc_slice_buffer_init(s->decompressed_data_buffer); - } - } + if (initial_metadata->idx.named.content_encoding == NULL || + grpc_stream_compression_method_parse(GRPC_MDVALUE(initial_metadata->idx.named.content_encoding->md), false, &s->stream_decompression_method) == 0) { + s->stream_decompression_method = GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; } } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index e5e4dd33443..33cc962f64e 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -561,10 +561,6 @@ struct grpc_chttp2_stream { grpc_stream_compression_method stream_compression_method; /* Stream decompression method to be used. */ grpc_stream_compression_method stream_decompression_method; - bool stream_compression_recv_enabled; - /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed - */ - bool unprocessed_incoming_frames_decompressed; /** Stream compression decompress context */ grpc_stream_compression_context *stream_decompression_ctx; /** Stream compression compress context */ @@ -576,7 +572,10 @@ struct grpc_chttp2_stream { * emptied */ size_t uncompressed_data_size; /** Temporary buffer storing decompressed data */ - grpc_slice_buffer *decompressed_data_buffer; + grpc_slice_buffer decompressed_data_buffer; + /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed + */ + bool unprocessed_incoming_frames_decompressed; }; /** Transport writing call flow: diff --git a/src/core/lib/compression/stream_compression_identity.c b/src/core/lib/compression/stream_compression_identity.c index 133c3250d27..91ee06d7831 100644 --- a/src/core/lib/compression/stream_compression_identity.c +++ b/src/core/lib/compression/stream_compression_identity.c @@ -59,6 +59,7 @@ static bool grpc_stream_decompress_identity( return false; } grpc_stream_compression_pass_through(in, out, output_size, max_output_size); + *end_of_context = false; return true; }