diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index ad0521cc42f..2a26bbb0c8b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -696,6 +696,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_slice_buffer_init(&s->compressed_data_buffer); grpc_slice_buffer_init(&s->decompressed_data_buffer); s->pending_byte_stream = false; + s->decompressed_header_bytes = 0; GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s, grpc_combiner_scheduler(t->combiner)); @@ -1734,7 +1735,7 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->unprocessed_incoming_frames_buffer, &s->decompressed_data_buffer, NULL, - GRPC_HEADER_SIZE_IN_BYTES, + GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes, &end_of_context)) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); @@ -1743,6 +1744,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Stream decompression error."); } else { + s->decompressed_header_bytes += s->decompressed_data_buffer.length; + if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) { + s->decompressed_header_bytes = 0; + } error = grpc_deframe_unprocessed_incoming_frames( exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer, NULL, s->recv_message); diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 222d2177b29..73aaab18025 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -210,7 +210,7 @@ grpc_error *grpc_deframe_unprocessed_incoming_frames( if (cur != end) { grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, + slices, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); } grpc_slice_unref_internal(exec_ctx, slice); @@ -277,7 +277,7 @@ grpc_error *grpc_deframe_unprocessed_incoming_frames( p->state = GRPC_CHTTP2_DATA_FH_0; cur += p->frame_size; grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, + slices, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); grpc_slice_unref_internal(exec_ctx, slice); return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 33cc962f64e..b64e9a0cd65 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -576,6 +576,8 @@ struct grpc_chttp2_stream { /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed */ bool unprocessed_incoming_frames_decompressed; + /** gRPC header bytes that are already decompressed */ + size_t decompressed_header_bytes; }; /** Transport writing call flow: