diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index d4188775722..5ced5829cc1 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -679,8 +679,6 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_slice_buffer_init(&frame_storage); grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&flow_controlled_buffer); - grpc_slice_buffer_init(&compressed_data_buffer); - grpc_slice_buffer_init(&decompressed_data_buffer); GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, grpc_combiner_scheduler(t->combiner)); @@ -704,8 +702,13 @@ grpc_chttp2_stream::~grpc_chttp2_stream() { grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(&frame_storage); - grpc_slice_buffer_destroy_internal(&compressed_data_buffer); - grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + grpc_slice_buffer_destroy_internal(&compressed_data_buffer); + } + if (stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); + } grpc_chttp2_list_remove_stalled_by_transport(t, this); grpc_chttp2_list_remove_stalled_by_stream(t, this); @@ -759,12 +762,15 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, GPR_TIMER_SCOPE("destroy_stream", 0); grpc_chttp2_transport* t = reinterpret_cast(gt); grpc_chttp2_stream* s = reinterpret_cast(gs); - - if (s->stream_compression_ctx != nullptr) { + if (s->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS && + s->stream_compression_ctx != nullptr) { grpc_stream_compression_context_destroy(s->stream_compression_ctx); s->stream_compression_ctx = nullptr; } - if (s->stream_decompression_ctx != nullptr) { + if (s->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS && + s->stream_decompression_ctx != nullptr) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); s->stream_decompression_ctx = nullptr; } @@ -1442,7 +1448,12 @@ static void perform_stream_op_locked(void* stream_op, true, &s->stream_compression_method) == 0) { s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; } - + if (s->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + s->uncompressed_data_size = 0; + s->stream_compression_ctx = nullptr; + grpc_slice_buffer_init(&s->compressed_data_buffer); + } s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = op_payload->send_initial_metadata.send_initial_metadata; @@ -1998,27 +2009,39 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, !s->seen_error && s->recv_trailing_metadata_finished != nullptr) { /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and * maybe decompress the next 5 bytes in the stream. */ - bool end_of_context; - if (!s->stream_decompression_ctx) { - s->stream_decompression_ctx = grpc_stream_compression_context_create( - s->stream_decompression_method); - } - if (!grpc_stream_decompress( - s->stream_decompression_ctx, &s->frame_storage, - &s->unprocessed_incoming_frames_buffer, nullptr, - GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { - grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); - grpc_slice_buffer_reset_and_unref_internal( - &s->unprocessed_incoming_frames_buffer); - s->seen_error = true; - } else { + if (s->stream_decompression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + grpc_slice_buffer_move_first(&s->frame_storage, + GRPC_HEADER_SIZE_IN_BYTES, + &s->unprocessed_incoming_frames_buffer); if (s->unprocessed_incoming_frames_buffer.length > 0) { s->unprocessed_incoming_frames_decompressed = true; pending_data = true; } - if (end_of_context) { - grpc_stream_compression_context_destroy(s->stream_decompression_ctx); - s->stream_decompression_ctx = nullptr; + } else { + bool end_of_context; + if (!s->stream_decompression_ctx) { + s->stream_decompression_ctx = grpc_stream_compression_context_create( + s->stream_decompression_method); + } + if (!grpc_stream_decompress( + s->stream_decompression_ctx, &s->frame_storage, + &s->unprocessed_incoming_frames_buffer, nullptr, + GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal( + &s->unprocessed_incoming_frames_buffer); + s->seen_error = true; + } else { + if (s->unprocessed_incoming_frames_buffer.length > 0) { + s->unprocessed_incoming_frames_decompressed = true; + pending_data = true; + } + if (end_of_context) { + grpc_stream_compression_context_destroy( + s->stream_decompression_ctx); + s->stream_decompression_ctx = nullptr; + } } } } @@ -2941,6 +2964,8 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint, } void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() { + GPR_DEBUG_ASSERT(stream_->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS); if (!stream_->stream_decompression_ctx) { stream_->stream_decompression_ctx = grpc_stream_compression_context_create( stream_->stream_decompression_method); @@ -2951,7 +2976,9 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); grpc_error* error; if (stream_->unprocessed_incoming_frames_buffer.length > 0) { - if (!stream_->unprocessed_incoming_frames_decompressed) { + if (!stream_->unprocessed_incoming_frames_decompressed && + stream_->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { bool end_of_context; MaybeCreateStreamDecompressionCtx(); if (!grpc_stream_decompress(stream_->stream_decompression_ctx, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc index 5bcdb4e2326..7a37d37fd10 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc @@ -1616,6 +1616,12 @@ static void parse_stream_compression_md(grpc_chttp2_transport* t, s->stream_decompression_method = GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; } + + if (s->stream_decompression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) { + s->stream_decompression_ctx = nullptr; + grpc_slice_buffer_init(&s->decompressed_data_buffer); + } } grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 00b1fe18b28..0a55ee111ee 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -583,10 +583,6 @@ struct grpc_chttp2_stream { grpc_slice_buffer frame_storage; /* protected by t combiner */ - /* Accessed only by transport thread when stream->pending_byte_stream == false - * Accessed only by application thread when stream->pending_byte_stream == - * true */ - grpc_slice_buffer unprocessed_incoming_frames_buffer; grpc_closure* on_next = nullptr; /* protected by t combiner */ bool pending_byte_stream = false; /* protected by t combiner */ // cached length of buffer to be used by the transport thread in cases where @@ -594,6 +590,10 @@ struct grpc_chttp2_stream { // application threads are allowed to modify // unprocessed_incoming_frames_buffer size_t unprocessed_incoming_frames_buffer_cached_length = 0; + /* Accessed only by transport thread when stream->pending_byte_stream == false + * Accessed only by application thread when stream->pending_byte_stream == + * true */ + grpc_slice_buffer unprocessed_incoming_frames_buffer; grpc_closure reset_byte_stream; grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */ bool received_last_frame = false; /* protected by t combiner */ @@ -634,18 +634,7 @@ struct grpc_chttp2_stream { /* Stream decompression method to be used. */ grpc_stream_compression_method stream_decompression_method = GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; - /** Stream compression decompress context */ - grpc_stream_compression_context* stream_decompression_ctx = nullptr; - /** Stream compression compress context */ - grpc_stream_compression_context* stream_compression_ctx = nullptr; - /** Buffer storing data that is compressed but not sent */ - grpc_slice_buffer compressed_data_buffer; - /** Amount of uncompressed bytes sent out when compressed_data_buffer is - * emptied */ - size_t uncompressed_data_size = 0; - /** Temporary buffer storing decompressed data */ - grpc_slice_buffer decompressed_data_buffer; /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ bool unprocessed_incoming_frames_decompressed = false; @@ -655,6 +644,22 @@ struct grpc_chttp2_stream { size_t decompressed_header_bytes = 0; /** Byte counter for number of bytes written */ size_t byte_counter = 0; + + /** Amount of uncompressed bytes sent out when compressed_data_buffer is + * emptied */ + size_t uncompressed_data_size; + /** Stream compression compress context */ + grpc_stream_compression_context* stream_compression_ctx; + /** Buffer storing data that is compressed but not sent */ + grpc_slice_buffer compressed_data_buffer; + + /** Stream compression decompress context */ + grpc_stream_compression_context* stream_decompression_ctx; + /** Temporary buffer storing decompressed data. + * Initialized, used, and destroyed only when stream uses (non-identity) + * compression. + */ + grpc_slice_buffer decompressed_data_buffer; }; /** Transport writing call flow: diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 3d1db0aa144..90015bd97ec 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -25,6 +25,7 @@ #include +#include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -150,7 +151,11 @@ static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s, ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64 ":s_win=%d:s_delta=%" PRId64 "]", t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length, - s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed, + s->stream_compression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS + ? 0 + : s->compressed_data_buffer.length, + s->flow_controlled_bytes_flowed, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], t->flow_control->remote_window(), @@ -325,7 +330,23 @@ class DataSendContext { bool AnyOutgoing() const { return max_outgoing() > 0; } + void FlushUncompressedBytes() { + uint32_t send_bytes = static_cast GPR_MIN( + max_outgoing(), s_->flow_controlled_buffer.length); + is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length && + s_->fetching_send_message == nullptr && + s_->send_trailing_metadata != nullptr && + grpc_metadata_batch_is_empty(s_->send_trailing_metadata); + grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes, + is_last_frame_, &s_->stats.outgoing, &t_->outbuf); + s_->flow_control->SentData(send_bytes); + s_->sending_bytes += send_bytes; + } + void FlushCompressedBytes() { + GPR_DEBUG_ASSERT(s_->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS); + uint32_t send_bytes = static_cast GPR_MIN( max_outgoing(), s_->compressed_data_buffer.length); bool is_last_data_frame = @@ -360,6 +381,9 @@ class DataSendContext { } void CompressMoreBytes() { + GPR_DEBUG_ASSERT(s_->stream_compression_method != + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS); + if (s_->stream_compression_ctx == nullptr) { s_->stream_compression_ctx = grpc_stream_compression_context_create(s_->stream_compression_method); @@ -417,7 +441,7 @@ class StreamWriteContext { // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid if (!t_->is_client && s_->fetching_send_message == nullptr && s_->flow_controlled_buffer.length == 0 && - s_->compressed_data_buffer.length == 0 && + compressed_data_buffer_len() == 0 && s_->send_trailing_metadata != nullptr && is_default_initial_metadata(s_->send_initial_metadata)) { ConvertInitialMetadataToTrailingMetadata(); @@ -446,6 +470,13 @@ class StreamWriteContext { "send_initial_metadata_finished"); } + bool compressed_data_buffer_len() { + return s_->stream_compression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS + ? 0 + : s_->compressed_data_buffer.length; + } + void FlushWindowUpdates() { /* send any window updates */ const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate(); @@ -462,7 +493,7 @@ class StreamWriteContext { if (!s_->sent_initial_metadata) return; if (s_->flow_controlled_buffer.length == 0 && - s_->compressed_data_buffer.length == 0) { + compressed_data_buffer_len() == 0) { return; // early out: nothing to do } @@ -479,13 +510,21 @@ class StreamWriteContext { return; // early out: nothing to do } - while ((s_->flow_controlled_buffer.length > 0 || - s_->compressed_data_buffer.length > 0) && - data_send_context.max_outgoing() > 0) { - if (s_->compressed_data_buffer.length > 0) { - data_send_context.FlushCompressedBytes(); - } else { - data_send_context.CompressMoreBytes(); + if (s_->stream_compression_method == + GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) { + while (s_->flow_controlled_buffer.length > 0 && + data_send_context.max_outgoing() > 0) { + data_send_context.FlushUncompressedBytes(); + } + } else { + while ((s_->flow_controlled_buffer.length > 0 || + s_->compressed_data_buffer.length > 0) && + data_send_context.max_outgoing() > 0) { + if (s_->compressed_data_buffer.length > 0) { + data_send_context.FlushCompressedBytes(); + } else { + data_send_context.CompressMoreBytes(); + } } } write_context_->ResetPingClock(); @@ -495,7 +534,7 @@ class StreamWriteContext { data_send_context.CallCallbacks(); stream_became_writable_ = true; if (s_->flow_controlled_buffer.length > 0 || - s_->compressed_data_buffer.length > 0) { + compressed_data_buffer_len() > 0) { GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t_, s_); } @@ -508,7 +547,7 @@ class StreamWriteContext { if (s_->send_trailing_metadata == nullptr) return; if (s_->fetching_send_message != nullptr) return; if (s_->flow_controlled_buffer.length != 0) return; - if (s_->compressed_data_buffer.length != 0) return; + if (compressed_data_buffer_len() != 0) return; GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {