Merge changes in transport from #11780

pull/11782/head
Muxi Yan 7 years ago
commit 6a9833b377
  1. 15
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 7
      src/core/ext/transport/chttp2/transport/internal.h
  3. 2
      src/core/ext/transport/chttp2/transport/writing.c

@ -1357,8 +1357,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
"fetching_send_message_finished"); "fetching_send_message_finished");
} else { } else {
GPR_ASSERT(s->fetching_send_message == NULL); GPR_ASSERT(s->fetching_send_message == NULL);
uint8_t *frame_hdr = uint8_t *frame_hdr = grpc_slice_buffer_tiny_add(
grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
uint32_t flags = op_payload->send_message.send_message->flags; uint32_t flags = op_payload->send_message.send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op_payload->send_message.send_message->length; size_t len = op_payload->send_message.send_message->length;
@ -1450,8 +1450,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message = op_payload->recv_message.recv_message; s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) { if (s->id != 0) {
already_received = s->frame_storage.length; already_received = s->frame_storage.length;
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, incoming_byte_stream_update_flow_control(
already_received); exec_ctx, t, s, GRPC_HEADER_SIZE_IN_BYTES, already_received);
} }
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
} }
@ -1686,7 +1686,8 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
} }
if (!grpc_stream_decompress(s->stream_decompression_ctx, if (!grpc_stream_decompress(s->stream_decompression_ctx,
&s->unprocessed_incoming_frames_buffer, &s->unprocessed_incoming_frames_buffer,
&s->decompressed_data_buffer, NULL, 5, &s->decompressed_data_buffer, NULL,
GRPC_HEADER_SIZE_IN_BYTES,
&end_of_context)) { &end_of_context)) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
&s->frame_storage); &s->frame_storage);
@ -1760,7 +1761,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
if (!grpc_stream_decompress(s->stream_decompression_ctx, if (!grpc_stream_decompress(s->stream_decompression_ctx,
&s->frame_storage, &s->frame_storage,
&s->unprocessed_incoming_frames_buffer, NULL, &s->unprocessed_incoming_frames_buffer, NULL,
5, &end_of_context)) { GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal( grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer); exec_ctx, &s->unprocessed_incoming_frames_buffer);
@ -2719,7 +2720,7 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
if (!grpc_stream_decompress(s->stream_decompression_ctx, if (!grpc_stream_decompress(s->stream_decompression_ctx,
&s->unprocessed_incoming_frames_buffer, &s->unprocessed_incoming_frames_buffer,
&s->decompressed_data_buffer, NULL, &s->decompressed_data_buffer, NULL,
~(size_t)0, &end_of_context)) { MAX_SIZE_T, &end_of_context)) {
error = error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
return error; return error;

@ -520,9 +520,9 @@ struct grpc_chttp2_stream {
grpc_chttp2_write_cb *finish_after_write; grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes; size_t sending_bytes;
/** Whether stream compression send is enabled or not */ /** Whether stream compression send is enabled */
bool stream_compression_recv_enabled; bool stream_compression_recv_enabled;
/** Whether stream compression recv is enabled or not */ /** Whether stream compression recv is enabled */
bool stream_compression_send_enabled; bool stream_compression_send_enabled;
/** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
*/ */
@ -634,6 +634,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_closure **pclosure, grpc_closure **pclosure,
grpc_error *error, const char *desc); grpc_error *error, const char *desc);
#define GRPC_HEADER_SIZE_IN_BYTES 5
#define MAX_SIZE_T (~(size_t)0)
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)

@ -351,7 +351,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
s->uncompressed_data_size = s->flow_controlled_buffer.length; s->uncompressed_data_size = s->flow_controlled_buffer.length;
GPR_ASSERT(grpc_stream_compress( GPR_ASSERT(grpc_stream_compress(
s->stream_compression_ctx, &s->flow_controlled_buffer, s->stream_compression_ctx, &s->flow_controlled_buffer,
&s->compressed_data_buffer, NULL, ~(size_t)0, &s->compressed_data_buffer, NULL, MAX_SIZE_T,
GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); GRPC_STREAM_COMPRESSION_FLUSH_SYNC));
} }
} }

Loading…
Cancel
Save