Add compression support to cronet transport

pull/10210/head
Muxi Yan 8 years ago
parent f0fb7413ea
commit 0aea920579
  1. 60
      src/core/ext/transport/cronet/transport/cronet_transport.c

@ -128,6 +128,7 @@ struct read_state {
int received_bytes; int received_bytes;
int remaining_bytes; int remaining_bytes;
int length_field; int length_field;
bool compressed;
char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
char *payload_field; char *payload_field;
bool read_stream_closed; bool read_stream_closed;
@ -503,6 +504,7 @@ static void on_response_headers_received(
is closed */ is closed */
GPR_ASSERT(s->state.rs.length_field_received == false); GPR_ASSERT(s->state.rs.length_field_received == false);
s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
s->state.rs.compressed = false;
s->state.rs.received_bytes = 0; s->state.rs.received_bytes = 0;
s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
@ -634,7 +636,7 @@ static void on_response_trailers_received(
*/ */
static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
char **pp_write_buffer, char **pp_write_buffer,
size_t *p_write_buffer_size) { size_t *p_write_buffer_size, uint32_t flags) {
grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
size_t length = GRPC_SLICE_LENGTH(slice); size_t length = GRPC_SLICE_LENGTH(slice);
*p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
@ -643,7 +645,7 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
*pp_write_buffer = write_buffer; *pp_write_buffer = write_buffer;
uint8_t *p = (uint8_t *)write_buffer; uint8_t *p = (uint8_t *)write_buffer;
/* Append 5 byte header */ /* Append 5 byte header */
*p++ = 0; *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0;
*p++ = (uint8_t)(length >> 24); *p++ = (uint8_t)(length >> 24);
*p++ = (uint8_t)(length >> 16); *p++ = (uint8_t)(length >> 16);
*p++ = (uint8_t)(length >> 8); *p++ = (uint8_t)(length >> 8);
@ -721,14 +723,26 @@ static void convert_metadata_to_cronet_headers(
*p_num_headers = (size_t)num_headers; *p_num_headers = (size_t)num_headers;
} }
static int parse_grpc_header(const uint8_t *data) { static bool parse_grpc_header(const uint8_t *data,
int *length,
bool *compressed) {
const uint8_t c = *data;
const uint8_t *p = data + 1; const uint8_t *p = data + 1;
int length = 0; *length = 0;
length |= ((uint8_t)*p++) << 24; *length |= ((uint8_t)*p++) << 24;
length |= ((uint8_t)*p++) << 16; *length |= ((uint8_t)*p++) << 16;
length |= ((uint8_t)*p++) << 8; *length |= ((uint8_t)*p++) << 8;
length |= ((uint8_t)*p++); *length |= ((uint8_t)*p++);
return length; switch (c) {
case 0:
*compressed = false;
return true;
case 1:
*compressed = true;
return true;
default:
return false;
}
} }
static bool header_has_authority(grpc_linked_mdelem *head) { static bool header_has_authority(grpc_linked_mdelem *head) {
@ -948,12 +962,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&write_slice_buffer); grpc_slice_buffer_init(&write_slice_buffer);
grpc_byte_stream_next(NULL, stream_op->send_message, &slice, grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
stream_op->send_message->length, NULL); stream_op->send_message->length, NULL);
/* Check that compression flag is OFF. We don't support compression yet.
*/
if (stream_op->send_message->flags != 0) {
gpr_log(GPR_ERROR, "Compression is not supported");
GPR_ASSERT(stream_op->send_message->flags == 0);
}
grpc_slice_buffer_add(&write_slice_buffer, slice); grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) { if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */ /* Empty request not handled yet */
@ -963,7 +971,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (write_slice_buffer.count > 0) { if (write_slice_buffer.count > 0) {
size_t write_buffer_size; size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
&write_buffer_size); &write_buffer_size, stream_op->send_message->flags);
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer); stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false; stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
@ -1059,8 +1067,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.remaining_bytes == 0) { stream_state->rs.remaining_bytes == 0) {
/* Start a read operation for data */ /* Start a read operation for data */
stream_state->rs.length_field_received = true; stream_state->rs.length_field_received = true;
stream_state->rs.length_field = stream_state->rs.remaining_bytes = if (parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer,
parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer); &stream_state->rs.length_field,
&stream_state->rs.compressed)) {
stream_state->rs.remaining_bytes = stream_state->rs.length_field;
} else {
/* Error deframing the data frame. */
CRONET_LOG(GPR_DEBUG, "stream deframing error");
GPR_ASSERT(false);
}
CRONET_LOG(GPR_DEBUG, "length field = %d", CRONET_LOG(GPR_DEBUG, "length field = %d",
stream_state->rs.length_field); stream_state->rs.length_field);
if (stream_state->rs.length_field > 0) { if (stream_state->rs.length_field > 0) {
@ -1082,6 +1097,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0); &stream_state->rs.read_slice_buffer, 0);
if (stream_state->rs.compressed) {
stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
*((grpc_byte_buffer **)stream_op->recv_message) = *((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs; (grpc_byte_buffer *)&stream_state->rs.sbs;
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
@ -1093,6 +1111,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0; stream_state->rs.received_bytes = 0;
stream_state->rs.compressed = false;
stream_state->rs.length_field_received = false; stream_state->rs.length_field_received = false;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] = stream_state->state_op_done[OP_READ_REQ_MADE] =
@ -1107,6 +1126,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0; stream_state->rs.received_bytes = 0;
stream_state->rs.compressed = false;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] = stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */ true; /* Indicates that at least one read request has been made */
@ -1130,6 +1150,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
read_data_slice); read_data_slice);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0); &stream_state->rs.read_slice_buffer, 0);
if (stream_state->rs.compressed) {
stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS;
}
*((grpc_byte_buffer **)stream_op->recv_message) = *((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs; (grpc_byte_buffer *)&stream_state->rs.sbs;
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
@ -1139,6 +1162,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
/* Do an extra read to trigger on_succeeded() callback in case connection /* Do an extra read to trigger on_succeeded() callback in case connection
is closed */ is closed */
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.compressed = false;
stream_state->rs.received_bytes = 0; stream_state->rs.received_bytes = 0;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.length_field_received = false; stream_state->rs.length_field_received = false;

Loading…
Cancel
Save