Merge pull request #18925 from soheilhy/http2-compression

Use compress and decompress slice_buffers only when they are needed.
pull/18971/head
Soheil Hassas Yeganeh 6 years ago committed by GitHub
commit 1b23280277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 79
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 6
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  3. 35
      src/core/ext/transport/chttp2/transport/internal.h
  4. 63
      src/core/ext/transport/chttp2/transport/writing.cc

@ -679,8 +679,6 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
grpc_slice_buffer_init(&frame_storage); grpc_slice_buffer_init(&frame_storage);
grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
grpc_slice_buffer_init(&flow_controlled_buffer); grpc_slice_buffer_init(&flow_controlled_buffer);
grpc_slice_buffer_init(&compressed_data_buffer);
grpc_slice_buffer_init(&decompressed_data_buffer);
GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this, GRPC_CLOSURE_INIT(&complete_fetch_locked, ::complete_fetch_locked, this,
grpc_combiner_scheduler(t->combiner)); grpc_combiner_scheduler(t->combiner));
@ -704,8 +702,13 @@ grpc_chttp2_stream::~grpc_chttp2_stream() {
grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(&frame_storage); grpc_slice_buffer_destroy_internal(&frame_storage);
grpc_slice_buffer_destroy_internal(&compressed_data_buffer); if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
grpc_slice_buffer_destroy_internal(&decompressed_data_buffer); grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
}
if (stream_decompression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
}
grpc_chttp2_list_remove_stalled_by_transport(t, this); grpc_chttp2_list_remove_stalled_by_transport(t, this);
grpc_chttp2_list_remove_stalled_by_stream(t, this); grpc_chttp2_list_remove_stalled_by_stream(t, this);
@ -759,12 +762,15 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
GPR_TIMER_SCOPE("destroy_stream", 0); GPR_TIMER_SCOPE("destroy_stream", 0);
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs); grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
if (s->stream_compression_method !=
if (s->stream_compression_ctx != nullptr) { GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
s->stream_compression_ctx != nullptr) {
grpc_stream_compression_context_destroy(s->stream_compression_ctx); grpc_stream_compression_context_destroy(s->stream_compression_ctx);
s->stream_compression_ctx = nullptr; s->stream_compression_ctx = nullptr;
} }
if (s->stream_decompression_ctx != nullptr) { if (s->stream_decompression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
s->stream_decompression_ctx != nullptr) {
grpc_stream_compression_context_destroy(s->stream_decompression_ctx); grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
s->stream_decompression_ctx = nullptr; s->stream_decompression_ctx = nullptr;
} }
@ -1442,7 +1448,12 @@ static void perform_stream_op_locked(void* stream_op,
true, &s->stream_compression_method) == 0) { true, &s->stream_compression_method) == 0) {
s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
} }
if (s->stream_compression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
s->uncompressed_data_size = 0;
s->stream_compression_ctx = nullptr;
grpc_slice_buffer_init(&s->compressed_data_buffer);
}
s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata = s->send_initial_metadata =
op_payload->send_initial_metadata.send_initial_metadata; op_payload->send_initial_metadata.send_initial_metadata;
@ -1998,27 +2009,39 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
!s->seen_error && s->recv_trailing_metadata_finished != nullptr) { !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
/* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
* maybe decompress the next 5 bytes in the stream. */ * maybe decompress the next 5 bytes in the stream. */
bool end_of_context; if (s->stream_decompression_method ==
if (!s->stream_decompression_ctx) { GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
s->stream_decompression_ctx = grpc_stream_compression_context_create( grpc_slice_buffer_move_first(&s->frame_storage,
s->stream_decompression_method); GRPC_HEADER_SIZE_IN_BYTES,
} &s->unprocessed_incoming_frames_buffer);
if (!grpc_stream_decompress(
s->stream_decompression_ctx, &s->frame_storage,
&s->unprocessed_incoming_frames_buffer, nullptr,
GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal(
&s->unprocessed_incoming_frames_buffer);
s->seen_error = true;
} else {
if (s->unprocessed_incoming_frames_buffer.length > 0) { if (s->unprocessed_incoming_frames_buffer.length > 0) {
s->unprocessed_incoming_frames_decompressed = true; s->unprocessed_incoming_frames_decompressed = true;
pending_data = true; pending_data = true;
} }
if (end_of_context) { } else {
grpc_stream_compression_context_destroy(s->stream_decompression_ctx); bool end_of_context;
s->stream_decompression_ctx = nullptr; if (!s->stream_decompression_ctx) {
s->stream_decompression_ctx = grpc_stream_compression_context_create(
s->stream_decompression_method);
}
if (!grpc_stream_decompress(
s->stream_decompression_ctx, &s->frame_storage,
&s->unprocessed_incoming_frames_buffer, nullptr,
GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal(
&s->unprocessed_incoming_frames_buffer);
s->seen_error = true;
} else {
if (s->unprocessed_incoming_frames_buffer.length > 0) {
s->unprocessed_incoming_frames_decompressed = true;
pending_data = true;
}
if (end_of_context) {
grpc_stream_compression_context_destroy(
s->stream_decompression_ctx);
s->stream_decompression_ctx = nullptr;
}
} }
} }
} }
@ -2941,6 +2964,8 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
} }
void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() { void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
if (!stream_->stream_decompression_ctx) { if (!stream_->stream_decompression_ctx) {
stream_->stream_decompression_ctx = grpc_stream_compression_context_create( stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
stream_->stream_decompression_method); stream_->stream_decompression_method);
@ -2951,7 +2976,9 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
grpc_error* error; grpc_error* error;
if (stream_->unprocessed_incoming_frames_buffer.length > 0) { if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
if (!stream_->unprocessed_incoming_frames_decompressed) { if (!stream_->unprocessed_incoming_frames_decompressed &&
stream_->stream_decompression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
bool end_of_context; bool end_of_context;
MaybeCreateStreamDecompressionCtx(); MaybeCreateStreamDecompressionCtx();
if (!grpc_stream_decompress(stream_->stream_decompression_ctx, if (!grpc_stream_decompress(stream_->stream_decompression_ctx,

@ -1616,6 +1616,12 @@ static void parse_stream_compression_md(grpc_chttp2_transport* t,
s->stream_decompression_method = s->stream_decompression_method =
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS;
} }
if (s->stream_decompression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
s->stream_decompression_ctx = nullptr;
grpc_slice_buffer_init(&s->decompressed_data_buffer);
}
} }
grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser, grpc_error* grpc_chttp2_header_parser_parse(void* hpack_parser,

@ -583,10 +583,6 @@ struct grpc_chttp2_stream {
grpc_slice_buffer frame_storage; /* protected by t combiner */ grpc_slice_buffer frame_storage; /* protected by t combiner */
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
grpc_slice_buffer unprocessed_incoming_frames_buffer;
grpc_closure* on_next = nullptr; /* protected by t combiner */ grpc_closure* on_next = nullptr; /* protected by t combiner */
bool pending_byte_stream = false; /* protected by t combiner */ bool pending_byte_stream = false; /* protected by t combiner */
// cached length of buffer to be used by the transport thread in cases where // cached length of buffer to be used by the transport thread in cases where
@ -594,6 +590,10 @@ struct grpc_chttp2_stream {
// application threads are allowed to modify // application threads are allowed to modify
// unprocessed_incoming_frames_buffer // unprocessed_incoming_frames_buffer
size_t unprocessed_incoming_frames_buffer_cached_length = 0; size_t unprocessed_incoming_frames_buffer_cached_length = 0;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
grpc_slice_buffer unprocessed_incoming_frames_buffer;
grpc_closure reset_byte_stream; grpc_closure reset_byte_stream;
grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */ grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */
bool received_last_frame = false; /* protected by t combiner */ bool received_last_frame = false; /* protected by t combiner */
@ -634,18 +634,7 @@ struct grpc_chttp2_stream {
/* Stream decompression method to be used. */ /* Stream decompression method to be used. */
grpc_stream_compression_method stream_decompression_method = grpc_stream_compression_method stream_decompression_method =
GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS;
/** Stream compression decompress context */
grpc_stream_compression_context* stream_decompression_ctx = nullptr;
/** Stream compression compress context */
grpc_stream_compression_context* stream_compression_ctx = nullptr;
/** Buffer storing data that is compressed but not sent */
grpc_slice_buffer compressed_data_buffer;
/** Amount of uncompressed bytes sent out when compressed_data_buffer is
* emptied */
size_t uncompressed_data_size = 0;
/** Temporary buffer storing decompressed data */
grpc_slice_buffer decompressed_data_buffer;
/** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
*/ */
bool unprocessed_incoming_frames_decompressed = false; bool unprocessed_incoming_frames_decompressed = false;
@ -655,6 +644,22 @@ struct grpc_chttp2_stream {
size_t decompressed_header_bytes = 0; size_t decompressed_header_bytes = 0;
/** Byte counter for number of bytes written */ /** Byte counter for number of bytes written */
size_t byte_counter = 0; size_t byte_counter = 0;
/** Amount of uncompressed bytes sent out when compressed_data_buffer is
* emptied */
size_t uncompressed_data_size;
/** Stream compression compress context */
grpc_stream_compression_context* stream_compression_ctx;
/** Buffer storing data that is compressed but not sent */
grpc_slice_buffer compressed_data_buffer;
/** Stream compression decompress context */
grpc_stream_compression_context* stream_decompression_ctx;
/** Temporary buffer storing decompressed data.
* Initialized, used, and destroyed only when stream uses (non-identity)
* compression.
*/
grpc_slice_buffer decompressed_data_buffer;
}; };
/** Transport writing call flow: /** Transport writing call flow:

@ -25,6 +25,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/compression/stream_compression.h"
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
@ -150,7 +151,11 @@ static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64 ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
":s_win=%d:s_delta=%" PRId64 "]", ":s_win=%d:s_delta=%" PRId64 "]",
t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length, t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length,
s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed, s->stream_compression_method ==
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
? 0
: s->compressed_data_buffer.length,
s->flow_controlled_bytes_flowed,
t->settings[GRPC_ACKED_SETTINGS] t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
t->flow_control->remote_window(), t->flow_control->remote_window(),
@ -325,7 +330,23 @@ class DataSendContext {
bool AnyOutgoing() const { return max_outgoing() > 0; } bool AnyOutgoing() const { return max_outgoing() > 0; }
void FlushUncompressedBytes() {
uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
max_outgoing(), s_->flow_controlled_buffer.length);
is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
s_->fetching_send_message == nullptr &&
s_->send_trailing_metadata != nullptr &&
grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
s_->flow_control->SentData(send_bytes);
s_->sending_bytes += send_bytes;
}
void FlushCompressedBytes() { void FlushCompressedBytes() {
GPR_DEBUG_ASSERT(s_->stream_compression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
uint32_t send_bytes = static_cast<uint32_t> GPR_MIN( uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
max_outgoing(), s_->compressed_data_buffer.length); max_outgoing(), s_->compressed_data_buffer.length);
bool is_last_data_frame = bool is_last_data_frame =
@ -360,6 +381,9 @@ class DataSendContext {
} }
void CompressMoreBytes() { void CompressMoreBytes() {
GPR_DEBUG_ASSERT(s_->stream_compression_method !=
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
if (s_->stream_compression_ctx == nullptr) { if (s_->stream_compression_ctx == nullptr) {
s_->stream_compression_ctx = s_->stream_compression_ctx =
grpc_stream_compression_context_create(s_->stream_compression_method); grpc_stream_compression_context_create(s_->stream_compression_method);
@ -417,7 +441,7 @@ class StreamWriteContext {
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
if (!t_->is_client && s_->fetching_send_message == nullptr && if (!t_->is_client && s_->fetching_send_message == nullptr &&
s_->flow_controlled_buffer.length == 0 && s_->flow_controlled_buffer.length == 0 &&
s_->compressed_data_buffer.length == 0 && compressed_data_buffer_len() == 0 &&
s_->send_trailing_metadata != nullptr && s_->send_trailing_metadata != nullptr &&
is_default_initial_metadata(s_->send_initial_metadata)) { is_default_initial_metadata(s_->send_initial_metadata)) {
ConvertInitialMetadataToTrailingMetadata(); ConvertInitialMetadataToTrailingMetadata();
@ -446,6 +470,13 @@ class StreamWriteContext {
"send_initial_metadata_finished"); "send_initial_metadata_finished");
} }
bool compressed_data_buffer_len() {
return s_->stream_compression_method ==
GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
? 0
: s_->compressed_data_buffer.length;
}
void FlushWindowUpdates() { void FlushWindowUpdates() {
/* send any window updates */ /* send any window updates */
const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate(); const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
@ -462,7 +493,7 @@ class StreamWriteContext {
if (!s_->sent_initial_metadata) return; if (!s_->sent_initial_metadata) return;
if (s_->flow_controlled_buffer.length == 0 && if (s_->flow_controlled_buffer.length == 0 &&
s_->compressed_data_buffer.length == 0) { compressed_data_buffer_len() == 0) {
return; // early out: nothing to do return; // early out: nothing to do
} }
@ -479,13 +510,21 @@ class StreamWriteContext {
return; // early out: nothing to do return; // early out: nothing to do
} }
while ((s_->flow_controlled_buffer.length > 0 || if (s_->stream_compression_method ==
s_->compressed_data_buffer.length > 0) && GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
data_send_context.max_outgoing() > 0) { while (s_->flow_controlled_buffer.length > 0 &&
if (s_->compressed_data_buffer.length > 0) { data_send_context.max_outgoing() > 0) {
data_send_context.FlushCompressedBytes(); data_send_context.FlushUncompressedBytes();
} else { }
data_send_context.CompressMoreBytes(); } else {
while ((s_->flow_controlled_buffer.length > 0 ||
s_->compressed_data_buffer.length > 0) &&
data_send_context.max_outgoing() > 0) {
if (s_->compressed_data_buffer.length > 0) {
data_send_context.FlushCompressedBytes();
} else {
data_send_context.CompressMoreBytes();
}
} }
} }
write_context_->ResetPingClock(); write_context_->ResetPingClock();
@ -495,7 +534,7 @@ class StreamWriteContext {
data_send_context.CallCallbacks(); data_send_context.CallCallbacks();
stream_became_writable_ = true; stream_became_writable_ = true;
if (s_->flow_controlled_buffer.length > 0 || if (s_->flow_controlled_buffer.length > 0 ||
s_->compressed_data_buffer.length > 0) { compressed_data_buffer_len() > 0) {
GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork"); GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t_, s_); grpc_chttp2_list_add_writable_stream(t_, s_);
} }
@ -508,7 +547,7 @@ class StreamWriteContext {
if (s_->send_trailing_metadata == nullptr) return; if (s_->send_trailing_metadata == nullptr) return;
if (s_->fetching_send_message != nullptr) return; if (s_->fetching_send_message != nullptr) return;
if (s_->flow_controlled_buffer.length != 0) return; if (s_->flow_controlled_buffer.length != 0) return;
if (s_->compressed_data_buffer.length != 0) return; if (compressed_data_buffer_len() != 0) return;
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) { if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {

Loading…
Cancel
Save