From 8e14acc76bacdf67fa852a96e44381df238e3907 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 5 Sep 2017 13:39:07 -0700 Subject: [PATCH] Remove duplicate sentences on send path --- .../chttp2/transport/chttp2_transport.c | 14 ++- .../ext/transport/chttp2/transport/internal.h | 9 +- .../ext/transport/chttp2/transport/writing.c | 89 ++++++++----------- 3 files changed, 45 insertions(+), 67 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0e00995d11d..e1fb9dd9154 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -727,10 +727,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); - if (s->compressed_data_buffer) { - grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer); - gpr_free(s->compressed_data_buffer); - } + grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer); if (s->decompressed_data_buffer) { grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); gpr_free(s->decompressed_data_buffer); @@ -1300,12 +1297,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; /* Identify stream compression */ - if ((s->stream_compression_send_enabled = - (op_payload->send_initial_metadata.send_initial_metadata->idx.named - .content_encoding != NULL)) == true) { - s->compressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer)); - grpc_slice_buffer_init(s->compressed_data_buffer); + if (op_payload->send_initial_metadata.send_initial_metadata->idx.named.content_encoding == NULL || + grpc_stream_compression_method_parse(GRPC_MDVALUE(op_payload->send_initial_metadata.send_initial_metadata->idx.named.content_encoding->md), true, &s->stream_compression_method) == 0) { + s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; } + grpc_slice_buffer_init(&s->compressed_data_buffer); s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 3c41a8958f6..e5e4dd33443 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -557,10 +557,11 @@ struct grpc_chttp2_stream { grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; - /** Whether stream compression send is enabled */ + /* Stream compression method to be used. */ + grpc_stream_compression_method stream_compression_method; + /* Stream decompression method to be used. */ + grpc_stream_compression_method stream_decompression_method; bool stream_compression_recv_enabled; - /** Whether stream compression recv is enabled */ - bool stream_compression_send_enabled; /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ bool unprocessed_incoming_frames_decompressed; @@ -570,7 +571,7 @@ struct grpc_chttp2_stream { grpc_stream_compression_context *stream_compression_ctx; /** Buffer storing data that is compressed but not sent */ - grpc_slice_buffer *compressed_data_buffer; + grpc_slice_buffer compressed_data_buffer; /** Amount of uncompressed bytes sent out when compressed_data_buffer is * emptied */ size_t uncompressed_data_size; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index b877e9f1264..b6bc864c29d 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -288,8 +288,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ if (s->flow_controlled_buffer.length > 0 || - (s->stream_compression_send_enabled && - s->compressed_data_buffer->length > 0)) { + s->compressed_data_buffer.length > 0) { uint32_t stream_remote_window = (uint32_t)GPR_MAX( 0, s->flow_control.remote_window_delta + @@ -302,56 +301,40 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (max_outgoing > 0) { bool is_last_data_frame = false; bool is_last_frame = false; - if (s->stream_compression_send_enabled) { - while ((s->flow_controlled_buffer.length > 0 || - s->compressed_data_buffer->length > 0) && - max_outgoing > 0) { - if (s->compressed_data_buffer->length > 0) { - uint32_t send_bytes = (uint32_t)GPR_MIN( - max_outgoing, s->compressed_data_buffer->length); - is_last_data_frame = - (send_bytes == s->compressed_data_buffer->length && - s->flow_controlled_buffer.length == 0 && - s->fetching_send_message == NULL); - is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, - send_bytes, is_last_frame, - &s->stats.outgoing, &t->outbuf); - grpc_chttp2_flowctl_sent_data(&t->flow_control, - &s->flow_control, send_bytes); - max_outgoing -= send_bytes; - if (s->compressed_data_buffer->length == 0) { - s->sending_bytes += s->uncompressed_data_size; - } - } else { - if (s->stream_compression_ctx == NULL) { - s->stream_compression_ctx = - grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_GZIP_COMPRESS); - } - s->uncompressed_data_size = s->flow_controlled_buffer.length; - GPR_ASSERT(grpc_stream_compress( - s->stream_compression_ctx, &s->flow_controlled_buffer, - s->compressed_data_buffer, NULL, MAX_SIZE_T, - GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); + while ((s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer.length > 0) && + max_outgoing > 0) { + if (s->compressed_data_buffer.length > 0) { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->compressed_data_buffer.length); + is_last_data_frame = + (send_bytes == s->compressed_data_buffer.length && + s->flow_controlled_buffer.length == 0 && + s->fetching_send_message == NULL); + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + grpc_chttp2_flowctl_sent_data(&t->flow_control, + &s->flow_control, send_bytes); + max_outgoing -= send_bytes; + if (s->compressed_data_buffer.length == 0) { + s->sending_bytes += s->uncompressed_data_size; } + } else { + if (s->stream_compression_ctx == NULL) { + s->stream_compression_ctx = + grpc_stream_compression_context_create( + s->stream_compression_method); + } + s->uncompressed_data_size = s->flow_controlled_buffer.length; + GPR_ASSERT(grpc_stream_compress( + s->stream_compression_ctx, &s->flow_controlled_buffer, + &s->compressed_data_buffer, NULL, MAX_SIZE_T, + GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); } - } else { - uint32_t send_bytes = (uint32_t)GPR_MIN( - max_outgoing, s->flow_controlled_buffer.length); - is_last_data_frame = s->fetching_send_message == NULL && - send_bytes == s->flow_controlled_buffer.length; - is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, - send_bytes, is_last_frame, - &s->stats.outgoing, &t->outbuf); - grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, - send_bytes); - s->sending_bytes += send_bytes; } t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; @@ -371,8 +354,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } now_writing = true; if (s->flow_controlled_buffer.length > 0 || - (s->stream_compression_send_enabled && - s->compressed_data_buffer->length > 0)) { + s->compressed_data_buffer.length > 0) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -387,8 +369,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && s->flow_controlled_buffer.length == 0 && - (!s->stream_compression_send_enabled || - s->compressed_data_buffer->length == 0)) { + s->compressed_data_buffer.length == 0) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,