Moar tests, fixed wrongly named vbles, minor bugs.

pull/2135/head
David Garcia Quintas 10 years ago
parent 331d2da2a7
commit fc0fa3381c
  1. 21
      include/grpc/compression.h
  2. 26
      src/core/channel/compress_filter.c
  3. 46
      src/core/compression/algorithm.c
  4. 26
      src/core/surface/call.c
  5. 5
      src/core/surface/channel.c
  6. 4
      test/core/compression/message_compress_test.c
  7. 62
      test/core/end2end/tests/request_with_compressed_payload.c

@ -50,12 +50,29 @@ typedef enum {
GRPC_COMPRESS_LEVEL_NONE = 0,
GRPC_COMPRESS_LEVEL_LOW,
GRPC_COMPRESS_LEVEL_MED,
GRPC_COMPRESS_LEVEL_HIGH
GRPC_COMPRESS_LEVEL_HIGH,
GRPC_COMPRESS_LEVEL_COUNT
} grpc_compression_level;
const char *grpc_compression_algorithm_name(
/** Parses \a name as a grpc_compression_algorithm instance, updating \a
* algorithm. Returns 1 upon success, 0 otherwise. */
int grpc_compression_algorithm_parse(const char *name,
grpc_compression_algorithm *algorithm);
/** Updates \a name with the encoding name corresponding to a valid \a
* algorithm. Returns 1 upon success, 0 otherwise. */
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name);
/** Returns the compression level corresponding to \a algorithm.
*
* It abort()s for unknown algorithms. */
grpc_compression_level grpc_compression_level_for_algorithm(
grpc_compression_algorithm algorithm);
/** Returns the compression algorithm corresponding to \a level.
*
* It abort()s for unknown levels . */
grpc_compression_algorithm grpc_compression_algorithm_for_level(
grpc_compression_level level);

@ -80,10 +80,13 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
channel_data *channeld = elem->channel_data;
if (md->key == channeld->mdstr_compression_algorithm_key) {
assert(GPR_SLICE_LENGTH(md->value->slice) ==
sizeof(grpc_compression_algorithm));
memcpy(&calld->compression_algorithm, GPR_SLICE_START_PTR(md->value->slice),
sizeof(grpc_compression_algorithm));
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str,
&calld->compression_algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.",
md_c_str);
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
calld->has_compression_algorithm = 1;
return NULL;
}
@ -92,9 +95,11 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
}
static int skip_compression(channel_data *channeld, call_data *calld) {
if (calld->has_compression_algorithm &&
(calld->compression_algorithm == GRPC_COMPRESS_NONE)) {
return 1;
if (calld->has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
return 1;
}
return 0; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@ -255,14 +260,15 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_compression_algorithm_for_level(clevel);
channeld->mdstr_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
grpc_mdstr_from_string(mdctx, "grpc-encoding");
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorith_name;
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings(
mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key),
grpc_mdstr_from_buffer(mdctx, (gpr_uint8 *)&algo_idx,
sizeof(algo_idx)));
grpc_mdstr_from_string(mdctx, algorith_name));
}
/* The first and the last filters tend to be implemented differently to

@ -32,21 +32,39 @@
*/
#include <stdlib.h>
#include <string.h>
#include <grpc/compression.h>
const char *grpc_compression_algorithm_name(
grpc_compression_algorithm algorithm) {
int grpc_compression_algorithm_parse(const char* name,
grpc_compression_algorithm *algorithm) {
if (strcmp(name, "none") == 0) {
*algorithm = GRPC_COMPRESS_NONE;
} else if (strcmp(name, "gzip") == 0) {
*algorithm = GRPC_COMPRESS_GZIP;
} else if (strcmp(name, "deflate") == 0) {
*algorithm = GRPC_COMPRESS_DEFLATE;
} else {
return 0;
}
return 1;
}
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name) {
switch (algorithm) {
case GRPC_COMPRESS_NONE:
return "none";
*name = "none";
break;
case GRPC_COMPRESS_DEFLATE:
return "deflate";
*name = "deflate";
break;
case GRPC_COMPRESS_GZIP:
return "gzip";
case GRPC_COMPRESS_ALGORITHMS_COUNT:
return "error";
*name = "gzip";
break;
default:
return 0;
}
return "error";
return 1;
}
/* TODO(dgq): Add the ability to specify parameters to the individual
@ -65,3 +83,15 @@ grpc_compression_algorithm grpc_compression_algorithm_for_level(
abort();
}
}
grpc_compression_level grpc_compression_level_for_algorithm(
grpc_compression_algorithm algorithm) {
grpc_compression_level clevel;
for (clevel = GRPC_COMPRESS_LEVEL_NONE; clevel < GRPC_COMPRESS_LEVEL_COUNT;
++clevel) {
if (grpc_compression_algorithm_for_level(clevel) == algorithm) {
return clevel;
}
}
abort();
}

@ -724,9 +724,14 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->compression_algorithm == GRPC_COMPRESS_NONE)) {
char *message = NULL;
gpr_asprintf(
&message, "Invalid compression algorithm (%s) for compressed message.",
grpc_compression_algorithm_name(call->compression_algorithm));
char *alg_name;
if (!grpc_compression_algorithm_name(call->compression_algorithm, &alg_name)) {
/* This shouldn't happen, other than due to data corruption */
alg_name = "<unknown>";
}
gpr_asprintf(&message,
"Invalid compression algorithm (%s) for compressed message.",
alg_name);
cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message);
}
/* stash away parameters, and prepare for incoming slices */
@ -1206,17 +1211,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
static void destroy_compression(void *ignored) {}
static gpr_uint32 decode_compression(grpc_mdelem *md) {
grpc_compression_level clevel;
grpc_compression_algorithm algorithm;
void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
GPR_ASSERT(sizeof(clevel) == GPR_SLICE_LENGTH(md->value->slice));
memcpy(&clevel, GPR_SLICE_START_PTR(md->value->slice), sizeof(clevel));
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
assert(0);
}
grpc_mdelem_set_user_data(md, destroy_compression,
(void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
(void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
}
return clevel;
return algorithm;
}
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {

@ -100,7 +100,7 @@ grpc_channel *grpc_channel_create_from_filters(
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
grpc_mdstr_from_string(mdctx, "grpc-encoding");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
@ -273,7 +273,8 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(grpc_channel *channel) {
grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
grpc_channel *channel) {
return channel->grpc_compression_algorithm_string;
}

@ -61,13 +61,15 @@ static void assert_passthrough(gpr_slice value,
gpr_slice_buffer output;
gpr_slice final;
int was_compressed;
char *algorithm_name;
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algorithm_name) != 0);
gpr_log(GPR_INFO,
"assert_passthrough: value_length=%d value_hash=0x%08x "
"algorithm='%s' uncompressed_split='%s' compressed_split='%s'",
GPR_SLICE_LENGTH(value), gpr_murmur_hash3(GPR_SLICE_START_PTR(value),
GPR_SLICE_LENGTH(value), 0),
grpc_compression_algorithm_name(algorithm),
algorithm_name,
grpc_slice_split_mode_name(uncompressed_split_mode),
grpc_slice_split_mode_name(compressed_split_mode));

@ -104,7 +104,8 @@ static void request_with_payload_template(
grpc_end2end_test_config config, const char *test_name,
gpr_uint32 send_flags_bitmask,
grpc_compression_level requested_compression_level,
grpc_compression_algorithm expected_compression_algorithm) {
grpc_compression_algorithm expected_compression_algorithm,
grpc_metadata *client_metadata) {
grpc_call *c;
grpc_call *s;
gpr_slice request_payload_slice;
@ -125,8 +126,9 @@ static void request_with_payload_template(
size_t details_capacity = 0;
int was_cancelled = 2;
cq_verifier *cqv;
char str[1024];
char str[1024]; memset(&str[0], 1023, 'x'); str[1023] = '\0';
memset(&str[0], 1023, 'x'); str[1023] = '\0';
request_payload_slice = gpr_slice_from_copied_string(str);
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
@ -149,7 +151,12 @@ static void request_with_payload_template(
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
if (client_metadata != NULL) {
op->data.send_initial_metadata.count = 1;
op->data.send_initial_metadata.metadata = client_metadata;
} else {
op->data.send_initial_metadata.count = 0;
}
op->flags = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
@ -247,7 +254,16 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload(
grpc_end2end_test_config config) {
request_with_payload_template(
config, "test_invoke_request_with_exceptionally_uncompressed_payload",
GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE);
GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE,
NULL);
}
static void test_invoke_request_with_uncompressed_payload(
grpc_end2end_test_config config) {
request_with_payload_template(
config, "test_invoke_request_with_uncompressed_payload", 0,
GRPC_COMPRESS_LEVEL_NONE,
grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_NONE), NULL);
}
static void test_invoke_request_with_compressed_payload(
@ -255,19 +271,47 @@ static void test_invoke_request_with_compressed_payload(
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload", 0,
GRPC_COMPRESS_LEVEL_HIGH,
grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_HIGH));
grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_HIGH), NULL);
}
static void test_invoke_request_with_uncompressed_payload(
static void test_invoke_request_with_compressed_payload_md_override(
grpc_end2end_test_config config) {
grpc_metadata gzip_compression_override;
grpc_metadata none_compression_override;
gzip_compression_override.key = "grpc-encoding";
gzip_compression_override.value = "gzip";
gzip_compression_override.value_length = 4;
memset(&gzip_compression_override.internal_data, 0,
sizeof(gzip_compression_override.internal_data));
none_compression_override.key = "grpc-encoding";
none_compression_override.value = "none";
none_compression_override.value_length = 4;
memset(&none_compression_override.internal_data, 0,
sizeof(none_compression_override.internal_data));
/* Channel default NONE, call override to GZIP */
request_with_payload_template(
config, "test_invoke_request_with_uncompressed_payload", 0,
GRPC_COMPRESS_LEVEL_NONE,
grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_NONE));
config, "test_invoke_request_with_compressed_payload_md_override_1", 0,
GRPC_COMPRESS_LEVEL_NONE, GRPC_COMPRESS_GZIP, &gzip_compression_override);
/* Channel default DEFLATE, call override to GZIP */
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload_md_override_2", 0,
grpc_compression_level_for_algorithm(GRPC_COMPRESS_DEFLATE),
GRPC_COMPRESS_GZIP, &gzip_compression_override);
/* Channel default DEFLATE, call override to NONE */
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload_md_override_3", 0,
grpc_compression_level_for_algorithm(GRPC_COMPRESS_DEFLATE),
GRPC_COMPRESS_NONE, &none_compression_override);
}
void grpc_end2end_tests(grpc_end2end_test_config config) {
test_invoke_request_with_exceptionally_uncompressed_payload(config);
test_invoke_request_with_uncompressed_payload(config);
test_invoke_request_with_compressed_payload(config);
test_invoke_request_with_compressed_payload_md_override(config);
}

Loading…
Cancel
Save