From 7cd1425e29274a32bdbf253a586dbca2318dfb29 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 22 Jun 2017 10:49:43 -0700 Subject: [PATCH 1/2] Fix handling of send_message before send_initial_metadata in compress filter. --- .../message_compress_filter.c | 35 +++++--- test/core/end2end/tests/compressed_payload.c | 89 +++++++++++++------ 2 files changed, 85 insertions(+), 39 deletions(-) diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 04cb1d94f80..71a8bc5bec2 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -255,6 +255,23 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, } } +static void handle_send_message_batch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *op, + bool has_compression_algorithm) { + call_data *calld = elem->call_data; + if (!skip_compression(elem, op->payload->send_message.send_message->flags, + has_compression_algorithm)) { + calld->send_op = op; + calld->send_length = op->payload->send_message.send_message->length; + calld->send_flags = op->payload->send_message.send_message->flags; + continue_send_message(exec_ctx, elem); + } else { + /* pass control down the stack */ + grpc_call_next_op(exec_ctx, elem, op); + } +} + static void compress_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { @@ -307,8 +324,9 @@ static void compress_start_transport_stream_op_batch( goto retry_send_im; } if (cur != INITIAL_METADATA_UNSEEN) { - grpc_call_next_op(exec_ctx, elem, - (grpc_transport_stream_op_batch *)cur); + handle_send_message_batch(exec_ctx, elem, + (grpc_transport_stream_op_batch *)cur, + has_compression_algorithm); } } } @@ -325,17 +343,8 @@ static void compress_start_transport_stream_op_batch( break; case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: - if (!skip_compression(elem, - op->payload->send_message.send_message->flags, - cur == HAS_COMPRESSION_ALGORITHM)) { - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); - } else { - /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); - } + handle_send_message_batch(exec_ctx, elem, op, + cur == HAS_COMPRESSION_ALGORITHM); break; default: if (cur & CANCELLED_BIT) { diff --git a/test/core/end2end/tests/compressed_payload.c b/test/core/end2end/tests/compressed_payload.c index 35e2fd13ddf..e5e0a5ead42 100644 --- a/test/core/end2end/tests/compressed_payload.c +++ b/test/core/end2end/tests/compressed_payload.c @@ -274,7 +274,8 @@ static void request_with_payload_template( grpc_compression_algorithm expected_algorithm_from_client, grpc_compression_algorithm expected_algorithm_from_server, grpc_metadata *client_init_metadata, bool set_server_level, - grpc_compression_level server_compression_level) { + grpc_compression_level server_compression_level, + bool send_message_before_initial_metadata) { grpc_call *c; grpc_call *s; grpc_slice request_payload_slice; @@ -330,6 +331,20 @@ static void request_with_payload_template( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + if (send_message_before_initial_metadata) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = client_send_flags_bitmask; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(2), true); + } + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -394,23 +409,21 @@ static void request_with_payload_template( GPR_ASSERT(GRPC_CALL_OK == error); for (int i = 0; i < 2; i++) { - request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); - memset(ops, 0, sizeof(ops)); - op = ops; - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = request_payload; - op->flags = client_send_flags_bitmask; - op->reserved = NULL; - op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &response_payload_recv; - op->flags = 0; - op->reserved = NULL; - op++; - error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); - GPR_ASSERT(GRPC_CALL_OK == error); + if (i > 0 || !send_message_before_initial_metadata) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = client_send_flags_bitmask; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(2), 1); + } memset(ops, 0, sizeof(ops)); op = ops; @@ -421,6 +434,7 @@ static void request_with_payload_template( op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); cq_verify(cqv); @@ -438,8 +452,19 @@ static void request_with_payload_template( op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(103), 1); - CQ_EXPECT_COMPLETION(cqv, tag(2), 1); + CQ_EXPECT_COMPLETION(cqv, tag(3), 1); cq_verify(cqv); GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW); @@ -469,7 +494,7 @@ static void request_with_payload_template( op->flags = 0; op->reserved = NULL; op++; - error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(4), NULL); GPR_ASSERT(GRPC_CALL_OK == error); memset(ops, 0, sizeof(ops)); @@ -486,7 +511,7 @@ static void request_with_payload_template( GPR_ASSERT(GRPC_CALL_OK == error); CQ_EXPECT_COMPLETION(cqv, tag(1), 1); - CQ_EXPECT_COMPLETION(cqv, tag(3), 1); + CQ_EXPECT_COMPLETION(cqv, tag(4), 1); CQ_EXPECT_COMPLETION(cqv, tag(101), 1); CQ_EXPECT_COMPLETION(cqv, tag(104), 1); cq_verify(cqv); @@ -526,7 +551,7 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload( config, "test_invoke_request_with_exceptionally_uncompressed_payload", GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, NULL, false, - /* ignored */ GRPC_COMPRESS_LEVEL_NONE); + /* ignored */ GRPC_COMPRESS_LEVEL_NONE, false); } static void test_invoke_request_with_uncompressed_payload( @@ -534,7 +559,8 @@ static void test_invoke_request_with_uncompressed_payload( request_with_payload_template( config, "test_invoke_request_with_uncompressed_payload", 0, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, - GRPC_COMPRESS_NONE, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE); + GRPC_COMPRESS_NONE, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE, + false); } static void test_invoke_request_with_compressed_payload( @@ -542,7 +568,17 @@ static void test_invoke_request_with_compressed_payload( request_with_payload_template( config, "test_invoke_request_with_compressed_payload", 0, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, - GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE); + GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE, + false); +} + +static void test_invoke_request_with_send_message_before_initial_metadata( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_compressed_payload", 0, + GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, + GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE, + true); } static void test_invoke_request_with_server_level( @@ -550,7 +586,7 @@ static void test_invoke_request_with_server_level( request_with_payload_template( config, "test_invoke_request_with_server_level", 0, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE /* ignored */, - NULL, true, GRPC_COMPRESS_LEVEL_HIGH); + NULL, true, GRPC_COMPRESS_LEVEL_HIGH, false); } static void test_invoke_request_with_compressed_payload_md_override( @@ -574,21 +610,21 @@ static void test_invoke_request_with_compressed_payload_md_override( config, "test_invoke_request_with_compressed_payload_md_override_1", 0, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_NONE, &gzip_compression_override, false, - /*ignored*/ GRPC_COMPRESS_LEVEL_NONE); + /*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false); /* Channel default DEFLATE, call override to GZIP */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_2", 0, GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_NONE, &gzip_compression_override, false, - /*ignored*/ GRPC_COMPRESS_LEVEL_NONE); + /*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false); /* Channel default DEFLATE, call override to NONE (aka IDENTITY) */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_3", 0, GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, &identity_compression_override, false, - /*ignored*/ GRPC_COMPRESS_LEVEL_NONE); + /*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false); } static void test_invoke_request_with_disabled_algorithm( @@ -602,6 +638,7 @@ void compressed_payload(grpc_end2end_test_config config) { test_invoke_request_with_exceptionally_uncompressed_payload(config); test_invoke_request_with_uncompressed_payload(config); test_invoke_request_with_compressed_payload(config); + test_invoke_request_with_send_message_before_initial_metadata(config); test_invoke_request_with_server_level(config); test_invoke_request_with_compressed_payload_md_override(config); test_invoke_request_with_disabled_algorithm(config); From 5dc774199b18b94642eae61c865d7f64d7d6ad8f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 23 Jun 2017 09:13:16 -0700 Subject: [PATCH 2/2] Fix Windows compiler problem. --- test/core/end2end/tests/compressed_payload.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/core/end2end/tests/compressed_payload.c b/test/core/end2end/tests/compressed_payload.c index e5e0a5ead42..429717a7be0 100644 --- a/test/core/end2end/tests/compressed_payload.c +++ b/test/core/end2end/tests/compressed_payload.c @@ -279,7 +279,7 @@ static void request_with_payload_template( grpc_call *c; grpc_call *s; grpc_slice request_payload_slice; - grpc_byte_buffer *request_payload; + grpc_byte_buffer *request_payload = NULL; grpc_channel_args *client_args; grpc_channel_args *server_args; grpc_end2end_test_fixture f;