Transport layer send (compression) path

pull/11780/head
Muxi Yan 7 years ago
parent 68198b5744
commit e89c83a21b
  1. 6
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 11
      src/core/ext/transport/chttp2/transport/internal.h
  3. 82
      src/core/ext/transport/chttp2/transport/writing.c

@ -668,6 +668,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
grpc_chttp2_data_parser_init(&s->data_parser); grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer); grpc_slice_buffer_init(&s->flow_controlled_buffer);
grpc_slice_buffer_init(&s->compressed_data_buffer);
grpc_slice_buffer_init(&s->decompressed_data_buffer); grpc_slice_buffer_init(&s->decompressed_data_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s,
@ -707,6 +708,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer); &s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer);
grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_transport(t, s);
@ -758,6 +760,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
if (s->stream_compression_ctx != NULL) {
grpc_stream_compression_context_destroy(s->stream_compression_ctx);
s->stream_compression_ctx = NULL;
}
if (s->stream_decompression_ctx != NULL) { if (s->stream_decompression_ctx != NULL) {
grpc_stream_compression_context_destroy(s->stream_decompression_ctx); grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
s->stream_decompression_ctx = NULL; s->stream_decompression_ctx = NULL;

@ -522,12 +522,21 @@ struct grpc_chttp2_stream {
/** Whether stream compression send is enabled or not */ /** Whether stream compression send is enabled or not */
bool stream_compression_recv_enabled; bool stream_compression_recv_enabled;
/** Whether stream compression recv is enabled or not */
bool stream_compression_send_enabled;
/** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
*/ */
bool unprocessed_incoming_frames_decompressed; bool unprocessed_incoming_frames_decompressed;
/** Stream compression decompress context */ /** Stream compression decompress context */
grpc_stream_compression_context *stream_decompression_ctx; grpc_stream_compression_context *stream_decompression_ctx;
/** Stream compression compress context */
grpc_stream_compression_context *stream_compression_ctx;
/** Buffer storing data that is compressed but not sent */
grpc_slice_buffer compressed_data_buffer;
/** Amount of uncompressed bytes sent out when compressed_data_buffer is
* emptied */
size_t uncompressed_data_size;
/** Temporary buffer storing decompressed data */ /** Temporary buffer storing decompressed data */
grpc_slice_buffer decompressed_data_buffer; grpc_slice_buffer decompressed_data_buffer;
}; };

@ -303,7 +303,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
} }
if (sent_initial_metadata) { if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */ /* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0) { if (s->flow_controlled_buffer.length > 0 ||
s->compressed_data_buffer.length > 0) {
uint32_t stream_outgoing_window = (uint32_t)GPR_MAX( uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
0, 0,
s->outgoing_window_delta + s->outgoing_window_delta +
@ -314,21 +315,63 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
GPR_MIN(stream_outgoing_window, t->outgoing_window)); GPR_MIN(stream_outgoing_window, t->outgoing_window));
if (max_outgoing > 0) { if (max_outgoing > 0) {
uint32_t send_bytes = bool is_last_data_frame;
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); bool is_last_frame;
bool is_last_data_frame = if (s->stream_compression_send_enabled) {
s->fetching_send_message == NULL && while ((s->flow_controlled_buffer.length > 0 ||
send_bytes == s->flow_controlled_buffer.length; s->compressed_data_buffer.length > 0) &&
bool is_last_frame = max_outgoing > 0) {
is_last_data_frame && s->send_trailing_metadata != NULL && if (s->compressed_data_buffer.length > 0) {
grpc_metadata_batch_is_empty(s->send_trailing_metadata); uint32_t send_bytes = (uint32_t)GPR_MIN(
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, max_outgoing, s->compressed_data_buffer.length);
is_last_frame, &s->stats.outgoing, is_last_data_frame =
&t->outbuf); (send_bytes == s->compressed_data_buffer.length &&
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, s->flow_controlled_buffer.length == 0 &&
send_bytes); s->fetching_send_message == NULL);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, is_last_frame =
send_bytes); is_last_data_frame && s->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(s->send_trailing_metadata);
grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM(
"write", t, s, outgoing_window_delta, send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
max_outgoing -= send_bytes;
if (s->compressed_data_buffer.length == 0) {
s->sending_bytes += s->uncompressed_data_size;
}
} else {
if (s->stream_compression_ctx == NULL) {
s->stream_compression_ctx =
grpc_stream_compression_context_create(
GRPC_STREAM_COMPRESSION_COMPRESS);
}
s->uncompressed_data_size = s->flow_controlled_buffer.length;
GPR_ASSERT(grpc_stream_compress(
s->stream_compression_ctx, &s->flow_controlled_buffer,
&s->compressed_data_buffer, NULL, ~(size_t)0,
GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
}
}
} else {
uint32_t send_bytes = (uint32_t)GPR_MIN(
max_outgoing, s->flow_controlled_buffer.length);
is_last_data_frame = s->fetching_send_message == NULL &&
send_bytes == s->flow_controlled_buffer.length;
is_last_frame =
is_last_data_frame && s->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(s->send_trailing_metadata);
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
s->sending_bytes += send_bytes;
}
t->ping_state.pings_before_data_required = t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data; t->ping_policy.max_pings_without_data;
if (!t->is_client) { if (!t->is_client) {
@ -345,9 +388,9 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
&s->stats.outgoing)); &s->stats.outgoing));
} }
} }
s->sending_bytes += send_bytes;
now_writing = true; now_writing = true;
if (s->flow_controlled_buffer.length > 0) { if (s->flow_controlled_buffer.length > 0 ||
s->compressed_data_buffer.length > 0) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s); grpc_chttp2_list_add_writable_stream(t, s);
} }
@ -361,7 +404,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
} }
if (s->send_trailing_metadata != NULL && if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL && s->fetching_send_message == NULL &&
s->flow_controlled_buffer.length == 0) { s->flow_controlled_buffer.length == 0 &&
s->compressed_data_buffer.length == 0) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,

Loading…
Cancel
Save