From 0aea92057932578e57cc1d2918ada3df8282fe1a Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 15 Mar 2017 15:36:44 -0700 Subject: [PATCH 1/6] Add compression support to cronet transport --- .../cronet/transport/cronet_transport.c | 60 +++++++++++++------ 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index fabfaf8a270..fa2bdb8cf29 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -128,6 +128,7 @@ struct read_state { int received_bytes; int remaining_bytes; int length_field; + bool compressed; char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; char *payload_field; bool read_stream_closed; @@ -503,6 +504,7 @@ static void on_response_headers_received( is closed */ GPR_ASSERT(s->state.rs.length_field_received == false); s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; + s->state.rs.compressed = false; s->state.rs.received_bytes = 0; s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); @@ -634,7 +636,7 @@ static void on_response_trailers_received( */ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, char **pp_write_buffer, - size_t *p_write_buffer_size) { + size_t *p_write_buffer_size, uint32_t flags) { grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); size_t length = GRPC_SLICE_LENGTH(slice); *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; @@ -643,7 +645,7 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, *pp_write_buffer = write_buffer; uint8_t *p = (uint8_t *)write_buffer; /* Append 5 byte header */ - *p++ = 0; + *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0; *p++ = (uint8_t)(length >> 24); *p++ = (uint8_t)(length >> 16); *p++ = (uint8_t)(length >> 8); @@ -721,14 +723,26 @@ static void convert_metadata_to_cronet_headers( *p_num_headers = (size_t)num_headers; } -static int parse_grpc_header(const uint8_t *data) { +static bool parse_grpc_header(const uint8_t *data, + int *length, + bool *compressed) { + const uint8_t c = *data; const uint8_t *p = data + 1; - int length = 0; - length |= ((uint8_t)*p++) << 24; - length |= ((uint8_t)*p++) << 16; - length |= ((uint8_t)*p++) << 8; - length |= ((uint8_t)*p++); - return length; + *length = 0; + *length |= ((uint8_t)*p++) << 24; + *length |= ((uint8_t)*p++) << 16; + *length |= ((uint8_t)*p++) << 8; + *length |= ((uint8_t)*p++); + switch (c) { + case 0: + *compressed = false; + return true; + case 1: + *compressed = true; + return true; + default: + return false; + } } static bool header_has_authority(grpc_linked_mdelem *head) { @@ -948,12 +962,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&write_slice_buffer); grpc_byte_stream_next(NULL, stream_op->send_message, &slice, stream_op->send_message->length, NULL); - /* Check that compression flag is OFF. We don't support compression yet. - */ - if (stream_op->send_message->flags != 0) { - gpr_log(GPR_ERROR, "Compression is not supported"); - GPR_ASSERT(stream_op->send_message->flags == 0); - } grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ @@ -963,7 +971,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (write_slice_buffer.count > 0) { size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size); + &write_buffer_size, stream_op->send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1059,8 +1067,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.remaining_bytes == 0) { /* Start a read operation for data */ stream_state->rs.length_field_received = true; - stream_state->rs.length_field = stream_state->rs.remaining_bytes = - parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer); + if (parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer, + &stream_state->rs.length_field, + &stream_state->rs.compressed)) { + stream_state->rs.remaining_bytes = stream_state->rs.length_field; + } else { + /* Error deframing the data frame. */ + CRONET_LOG(GPR_DEBUG, "stream deframing error"); + GPR_ASSERT(false); + } CRONET_LOG(GPR_DEBUG, "length field = %d", stream_state->rs.length_field); if (stream_state->rs.length_field > 0) { @@ -1082,6 +1097,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); + if (stream_state->rs.compressed) { + stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, @@ -1093,6 +1111,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.compressed = false; stream_state->rs.length_field_received = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = @@ -1107,6 +1126,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.compressed = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = true; /* Indicates that at least one read request has been made */ @@ -1130,6 +1150,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, read_data_slice); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); + if (stream_state->rs.compressed) { + stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; + } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, @@ -1139,6 +1162,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, /* Do an extra read to trigger on_succeeded() callback in case connection is closed */ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; + stream_state->rs.compressed = false; stream_state->rs.received_bytes = 0; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.length_field_received = false; From 780cf2a9c15fc41751c42caeada2b8340cd034cd Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 17 Mar 2017 14:36:01 -0700 Subject: [PATCH 2/6] Fix that there is no :status header in the case of trailer-only failure reply --- .../chttp2/transport/chttp2_transport.c | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index a3684535ff0..042a89277db 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1760,6 +1760,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { grpc_slice hdr; grpc_slice status_hdr; + grpc_slice http_status_hdr; grpc_slice message_pfx; uint8_t *p; uint32_t len = 0; @@ -1775,6 +1776,26 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, It's complicated by the fact that our send machinery would be dead by the time we got around to sending this, so instead we ignore HPACK compression and just write the uncompressed bytes onto the wire. */ + if (!s->sent_initial_metadata) { + http_status_hdr = grpc_slice_malloc(13); + p = GRPC_SLICE_START_PTR(http_status_hdr); + *p++ = 0x00; + *p++ = 7; + *p++ = ':'; + *p++ = 's'; + *p++ = 't'; + *p++ = 'a'; + *p++ = 't'; + *p++ = 'u'; + *p++ = 's'; + *p++ = 3; + *p++ = '2'; + *p++ = '0'; + *p++ = '0'; + GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr)); + len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr); + } + status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10)); p = GRPC_SLICE_START_PTR(status_hdr); *p++ = 0x00; /* literal header, not indexed */ @@ -1842,6 +1863,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr)); grpc_slice_buffer_add(&t->qbuf, hdr); + if (!s->sent_initial_metadata) { + grpc_slice_buffer_add(&t->qbuf, http_status_hdr); + } grpc_slice_buffer_add(&t->qbuf, status_hdr); if (msg != NULL) { grpc_slice_buffer_add(&t->qbuf, message_pfx); From 59660e55709d29f71d696221d77e0f5d6cc76f7a Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 17 Mar 2017 15:54:06 -0700 Subject: [PATCH 3/6] Add supprot of processing trailer-only response in cronet_transport.c --- .../cronet/transport/cronet_transport.c | 19 +++++++++++++++++-- .../CoreCronetEnd2EndTests.m | 3 +-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index fa2bdb8cf29..791f0416ad0 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -484,6 +484,16 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj *s = (stream_obj *)stream->annotation; + + /* Identify if this is a header or a trailer (in a trailer-only response case) + */ + for (size_t i = 0; i < headers->count; i++) { + if (0 == strcmp("grpc-status", headers->headers[i].key)) { + on_response_trailers_received(stream, headers); + return; + } + } + gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); @@ -795,7 +805,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't received headers yet. */ - else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) + else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] && + !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; } else if (op_id == OP_SEND_MESSAGE) { /* already executed (note we're checking op specific state, not stream @@ -808,7 +819,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, /* already executed */ if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false; /* we haven't received headers yet. */ - else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) + else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] && + !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; } else if (op_id == OP_RECV_TRAILING_METADATA) { /* already executed */ @@ -1023,6 +1035,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_state->state_callback_received[OP_FAILED]) { grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE); + } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m index 1e0c8024cab..3b442645e83 100644 --- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m +++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m @@ -273,8 +273,7 @@ static char *roots_filename; } - (void)testCompressedPayload { - // NOT SUPPORTED - // [self testIndividualCase:"compressed_payload"]; + [self testIndividualCase:"compressed_payload"]; } - (void)testConnectivity { From eb6f20128ac2f902d9d6ad7e49a51897b3e06d44 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 20 Mar 2017 13:53:59 -0700 Subject: [PATCH 4/6] Improvement on reading compression byt --- .../cronet/transport/cronet_transport.c | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 791f0416ad0..4d80501ba84 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -733,26 +733,17 @@ static void convert_metadata_to_cronet_headers( *p_num_headers = (size_t)num_headers; } -static bool parse_grpc_header(const uint8_t *data, +static void parse_grpc_header(const uint8_t *data, int *length, bool *compressed) { const uint8_t c = *data; const uint8_t *p = data + 1; + *compressed = ((c & 0x01) == 0x01); *length = 0; *length |= ((uint8_t)*p++) << 24; *length |= ((uint8_t)*p++) << 16; *length |= ((uint8_t)*p++) << 8; *length |= ((uint8_t)*p++); - switch (c) { - case 0: - *compressed = false; - return true; - case 1: - *compressed = true; - return true; - default: - return false; - } } static bool header_has_authority(grpc_linked_mdelem *head) { @@ -1082,15 +1073,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.remaining_bytes == 0) { /* Start a read operation for data */ stream_state->rs.length_field_received = true; - if (parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer, - &stream_state->rs.length_field, - &stream_state->rs.compressed)) { - stream_state->rs.remaining_bytes = stream_state->rs.length_field; - } else { - /* Error deframing the data frame. */ - CRONET_LOG(GPR_DEBUG, "stream deframing error"); - GPR_ASSERT(false); - } + parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer, + &stream_state->rs.length_field, + &stream_state->rs.compressed); CRONET_LOG(GPR_DEBUG, "length field = %d", stream_state->rs.length_field); if (stream_state->rs.length_field > 0) { From b4478ff26496ceb3b92c8ad768bfe02121c5d158 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 20 Mar 2017 18:56:57 -0700 Subject: [PATCH 5/6] Comment --- src/core/ext/transport/cronet/transport/cronet_transport.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 4d80501ba84..50dee35786c 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -655,7 +655,9 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, *pp_write_buffer = write_buffer; uint8_t *p = (uint8_t *)write_buffer; /* Append 5 byte header */ + /* Compressed flag */ *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0; + /* Message length */ *p++ = (uint8_t)(length >> 24); *p++ = (uint8_t)(length >> 16); *p++ = (uint8_t)(length >> 8); From 94f3dddfdc584b9de7e61f14e97183833dc5672e Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 21 Mar 2017 10:34:12 -0700 Subject: [PATCH 6/6] clang-format --- src/core/ext/transport/cronet/transport/cronet_transport.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 50dee35786c..67bf6bb1519 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -735,8 +735,7 @@ static void convert_metadata_to_cronet_headers( *p_num_headers = (size_t)num_headers; } -static void parse_grpc_header(const uint8_t *data, - int *length, +static void parse_grpc_header(const uint8_t *data, int *length, bool *compressed) { const uint8_t c = *data; const uint8_t *p = data + 1;