Merge pull request #11577 from markdroth/compression_filter_fix

Fix handling of send_message before send_initial_metadata in compress filter.
pull/11070/head
Mark D. Roth 8 years ago committed by GitHub
commit bf527617bf
  1. 35
      src/core/ext/filters/http/message_compress/message_compress_filter.c
  2. 91
      test/core/end2end/tests/compressed_payload.c

@ -255,6 +255,23 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
}
}
static void handle_send_message_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op,
bool has_compression_algorithm) {
call_data *calld = elem->call_data;
if (!skip_compression(elem, op->payload->send_message.send_message->flags,
has_compression_algorithm)) {
calld->send_op = op;
calld->send_length = op->payload->send_message.send_message->length;
calld->send_flags = op->payload->send_message.send_message->flags;
continue_send_message(exec_ctx, elem);
} else {
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
}
}
static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
@ -307,8 +324,9 @@ static void compress_start_transport_stream_op_batch(
goto retry_send_im;
}
if (cur != INITIAL_METADATA_UNSEEN) {
grpc_call_next_op(exec_ctx, elem,
(grpc_transport_stream_op_batch *)cur);
handle_send_message_batch(exec_ctx, elem,
(grpc_transport_stream_op_batch *)cur,
has_compression_algorithm);
}
}
}
@ -325,17 +343,8 @@ static void compress_start_transport_stream_op_batch(
break;
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
if (!skip_compression(elem,
op->payload->send_message.send_message->flags,
cur == HAS_COMPRESSION_ALGORITHM)) {
calld->send_op = op;
calld->send_length = op->payload->send_message.send_message->length;
calld->send_flags = op->payload->send_message.send_message->flags;
continue_send_message(exec_ctx, elem);
} else {
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
}
handle_send_message_batch(exec_ctx, elem, op,
cur == HAS_COMPRESSION_ALGORITHM);
break;
default:
if (cur & CANCELLED_BIT) {

@ -274,11 +274,12 @@ static void request_with_payload_template(
grpc_compression_algorithm expected_algorithm_from_client,
grpc_compression_algorithm expected_algorithm_from_server,
grpc_metadata *client_init_metadata, bool set_server_level,
grpc_compression_level server_compression_level) {
grpc_compression_level server_compression_level,
bool send_message_before_initial_metadata) {
grpc_call *c;
grpc_call *s;
grpc_slice request_payload_slice;
grpc_byte_buffer *request_payload;
grpc_byte_buffer *request_payload = NULL;
grpc_channel_args *client_args;
grpc_channel_args *server_args;
grpc_end2end_test_fixture f;
@ -330,6 +331,20 @@ static void request_with_payload_template(
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
if (send_message_before_initial_metadata) {
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = client_send_flags_bitmask;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
}
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@ -394,23 +409,21 @@ static void request_with_payload_template(
GPR_ASSERT(GRPC_CALL_OK == error);
for (int i = 0; i < 2; i++) {
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = client_send_flags_bitmask;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
if (i > 0 || !send_message_before_initial_metadata) {
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = client_send_flags_bitmask;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
}
memset(ops, 0, sizeof(ops));
op = ops;
@ -421,6 +434,7 @@ static void request_with_payload_template(
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
cq_verify(cqv);
@ -438,8 +452,19 @@ static void request_with_payload_template(
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(103), 1);
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
CQ_EXPECT_COMPLETION(cqv, tag(3), 1);
cq_verify(cqv);
GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW);
@ -469,7 +494,7 @@ static void request_with_payload_template(
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(4), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
@ -486,7 +511,7 @@ static void request_with_payload_template(
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
CQ_EXPECT_COMPLETION(cqv, tag(3), 1);
CQ_EXPECT_COMPLETION(cqv, tag(4), 1);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
CQ_EXPECT_COMPLETION(cqv, tag(104), 1);
cq_verify(cqv);
@ -526,7 +551,7 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload(
config, "test_invoke_request_with_exceptionally_uncompressed_payload",
GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, NULL, false,
/* ignored */ GRPC_COMPRESS_LEVEL_NONE);
/* ignored */ GRPC_COMPRESS_LEVEL_NONE, false);
}
static void test_invoke_request_with_uncompressed_payload(
@ -534,7 +559,8 @@ static void test_invoke_request_with_uncompressed_payload(
request_with_payload_template(
config, "test_invoke_request_with_uncompressed_payload", 0,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_NONE, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE);
GRPC_COMPRESS_NONE, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
false);
}
static void test_invoke_request_with_compressed_payload(
@ -542,7 +568,17 @@ static void test_invoke_request_with_compressed_payload(
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload", 0,
GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE);
GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
false);
}
static void test_invoke_request_with_send_message_before_initial_metadata(
grpc_end2end_test_config config) {
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload", 0,
GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
true);
}
static void test_invoke_request_with_server_level(
@ -550,7 +586,7 @@ static void test_invoke_request_with_server_level(
request_with_payload_template(
config, "test_invoke_request_with_server_level", 0, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE /* ignored */,
NULL, true, GRPC_COMPRESS_LEVEL_HIGH);
NULL, true, GRPC_COMPRESS_LEVEL_HIGH, false);
}
static void test_invoke_request_with_compressed_payload_md_override(
@ -574,21 +610,21 @@ static void test_invoke_request_with_compressed_payload_md_override(
config, "test_invoke_request_with_compressed_payload_md_override_1", 0,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_NONE, &gzip_compression_override, false,
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE);
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false);
/* Channel default DEFLATE, call override to GZIP */
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload_md_override_2", 0,
GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_NONE, &gzip_compression_override, false,
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE);
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false);
/* Channel default DEFLATE, call override to NONE (aka IDENTITY) */
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload_md_override_3", 0,
GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_NONE, &identity_compression_override, false,
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE);
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false);
}
static void test_invoke_request_with_disabled_algorithm(
@ -602,6 +638,7 @@ void compressed_payload(grpc_end2end_test_config config) {
test_invoke_request_with_exceptionally_uncompressed_payload(config);
test_invoke_request_with_uncompressed_payload(config);
test_invoke_request_with_compressed_payload(config);
test_invoke_request_with_send_message_before_initial_metadata(config);
test_invoke_request_with_server_level(config);
test_invoke_request_with_compressed_payload_md_override(config);
test_invoke_request_with_disabled_algorithm(config);

Loading…
Cancel
Save