Remove duplicate sentences on send path

pull/12399/head
Muxi Yan 7 years ago
parent bf5484e785
commit 8e14acc76b
  1. 14
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 9
      src/core/ext/transport/chttp2/transport/internal.h
  3. 89
      src/core/ext/transport/chttp2/transport/writing.c

@ -727,10 +727,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer); &s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
if (s->compressed_data_buffer) { grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer);
grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer);
gpr_free(s->compressed_data_buffer);
}
if (s->decompressed_data_buffer) { if (s->decompressed_data_buffer) {
grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer);
gpr_free(s->decompressed_data_buffer); gpr_free(s->decompressed_data_buffer);
@ -1300,12 +1297,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
/* Identify stream compression */ /* Identify stream compression */
if ((s->stream_compression_send_enabled = if (op_payload->send_initial_metadata.send_initial_metadata->idx.named.content_encoding == NULL ||
(op_payload->send_initial_metadata.send_initial_metadata->idx.named grpc_stream_compression_method_parse(GRPC_MDVALUE(op_payload->send_initial_metadata.send_initial_metadata->idx.named.content_encoding->md), true, &s->stream_compression_method) == 0) {
.content_encoding != NULL)) == true) { s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
s->compressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer));
grpc_slice_buffer_init(s->compressed_data_buffer);
} }
grpc_slice_buffer_init(&s->compressed_data_buffer);
s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata = s->send_initial_metadata =

@ -557,10 +557,11 @@ struct grpc_chttp2_stream {
grpc_chttp2_write_cb *finish_after_write; grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes; size_t sending_bytes;
/** Whether stream compression send is enabled */ /* Stream compression method to be used. */
grpc_stream_compression_method stream_compression_method;
/* Stream decompression method to be used. */
grpc_stream_compression_method stream_decompression_method;
bool stream_compression_recv_enabled; bool stream_compression_recv_enabled;
/** Whether stream compression recv is enabled */
bool stream_compression_send_enabled;
/** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
*/ */
bool unprocessed_incoming_frames_decompressed; bool unprocessed_incoming_frames_decompressed;
@ -570,7 +571,7 @@ struct grpc_chttp2_stream {
grpc_stream_compression_context *stream_compression_ctx; grpc_stream_compression_context *stream_compression_ctx;
/** Buffer storing data that is compressed but not sent */ /** Buffer storing data that is compressed but not sent */
grpc_slice_buffer *compressed_data_buffer; grpc_slice_buffer compressed_data_buffer;
/** Amount of uncompressed bytes sent out when compressed_data_buffer is /** Amount of uncompressed bytes sent out when compressed_data_buffer is
* emptied */ * emptied */
size_t uncompressed_data_size; size_t uncompressed_data_size;

@ -288,8 +288,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (sent_initial_metadata) { if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */ /* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0 || if (s->flow_controlled_buffer.length > 0 ||
(s->stream_compression_send_enabled && s->compressed_data_buffer.length > 0) {
s->compressed_data_buffer->length > 0)) {
uint32_t stream_remote_window = (uint32_t)GPR_MAX( uint32_t stream_remote_window = (uint32_t)GPR_MAX(
0, 0,
s->flow_control.remote_window_delta + s->flow_control.remote_window_delta +
@ -302,56 +301,40 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (max_outgoing > 0) { if (max_outgoing > 0) {
bool is_last_data_frame = false; bool is_last_data_frame = false;
bool is_last_frame = false; bool is_last_frame = false;
if (s->stream_compression_send_enabled) { while ((s->flow_controlled_buffer.length > 0 ||
while ((s->flow_controlled_buffer.length > 0 || s->compressed_data_buffer.length > 0) &&
s->compressed_data_buffer->length > 0) && max_outgoing > 0) {
max_outgoing > 0) { if (s->compressed_data_buffer.length > 0) {
if (s->compressed_data_buffer->length > 0) { uint32_t send_bytes = (uint32_t)GPR_MIN(
uint32_t send_bytes = (uint32_t)GPR_MIN( max_outgoing, s->compressed_data_buffer.length);
max_outgoing, s->compressed_data_buffer->length); is_last_data_frame =
is_last_data_frame = (send_bytes == s->compressed_data_buffer.length &&
(send_bytes == s->compressed_data_buffer->length && s->flow_controlled_buffer.length == 0 &&
s->flow_controlled_buffer.length == 0 && s->fetching_send_message == NULL);
s->fetching_send_message == NULL); is_last_frame =
is_last_frame = is_last_data_frame && s->send_trailing_metadata != NULL &&
is_last_data_frame && s->send_trailing_metadata != NULL && grpc_metadata_batch_is_empty(s->send_trailing_metadata);
grpc_metadata_batch_is_empty(s->send_trailing_metadata); grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer,
grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, send_bytes, is_last_frame,
send_bytes, is_last_frame, &s->stats.outgoing, &t->outbuf);
&s->stats.outgoing, &t->outbuf); grpc_chttp2_flowctl_sent_data(&t->flow_control,
grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, send_bytes);
&s->flow_control, send_bytes); max_outgoing -= send_bytes;
max_outgoing -= send_bytes; if (s->compressed_data_buffer.length == 0) {
if (s->compressed_data_buffer->length == 0) { s->sending_bytes += s->uncompressed_data_size;
s->sending_bytes += s->uncompressed_data_size;
}
} else {
if (s->stream_compression_ctx == NULL) {
s->stream_compression_ctx =
grpc_stream_compression_context_create(
GRPC_STREAM_COMPRESSION_GZIP_COMPRESS);
}
s->uncompressed_data_size = s->flow_controlled_buffer.length;
GPR_ASSERT(grpc_stream_compress(
s->stream_compression_ctx, &s->flow_controlled_buffer,
s->compressed_data_buffer, NULL, MAX_SIZE_T,
GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
} }
} else {
if (s->stream_compression_ctx == NULL) {
s->stream_compression_ctx =
grpc_stream_compression_context_create(
s->stream_compression_method);
}
s->uncompressed_data_size = s->flow_controlled_buffer.length;
GPR_ASSERT(grpc_stream_compress(
s->stream_compression_ctx, &s->flow_controlled_buffer,
&s->compressed_data_buffer, NULL, MAX_SIZE_T,
GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
} }
} else {
uint32_t send_bytes = (uint32_t)GPR_MIN(
max_outgoing, s->flow_controlled_buffer.length);
is_last_data_frame = s->fetching_send_message == NULL &&
send_bytes == s->flow_controlled_buffer.length;
is_last_frame =
is_last_data_frame && s->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(s->send_trailing_metadata);
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer,
send_bytes, is_last_frame,
&s->stats.outgoing, &t->outbuf);
grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control,
send_bytes);
s->sending_bytes += send_bytes;
} }
t->ping_state.pings_before_data_required = t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data; t->ping_policy.max_pings_without_data;
@ -371,8 +354,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
} }
now_writing = true; now_writing = true;
if (s->flow_controlled_buffer.length > 0 || if (s->flow_controlled_buffer.length > 0 ||
(s->stream_compression_send_enabled && s->compressed_data_buffer.length > 0) {
s->compressed_data_buffer->length > 0)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s); grpc_chttp2_list_add_writable_stream(t, s);
} }
@ -387,8 +369,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (s->send_trailing_metadata != NULL && if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL && s->fetching_send_message == NULL &&
s->flow_controlled_buffer.length == 0 && s->flow_controlled_buffer.length == 0 &&
(!s->stream_compression_send_enabled || s->compressed_data_buffer.length == 0) {
s->compressed_data_buffer->length == 0)) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,

Loading…
Cancel
Save