WIP, code complete, missing tests. Asan passes.

pull/2533/head
David Garcia Quintas 10 years ago
parent 0324758dfc
commit b8edf7ed01
  1. 38
      src/core/channel/compress_filter.c
  2. 10
      src/core/compression/algorithm.c
  3. 34
      src/core/surface/call.c
  4. 9
      src/core/surface/channel.c
  5. 2
      src/core/surface/channel.h

@ -35,16 +35,20 @@
#include <string.h>
#include <grpc/compression.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/useful.h>
#include "src/core/channel/compress_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/compression/message_compress.h"
#include "src/core/support/string.h"
typedef struct call_data {
gpr_slice_buffer slices;
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
int remaining_slice_bytes;
int seen_initial_metadata;
grpc_compression_algorithm compression_algorithm;
@ -54,7 +58,9 @@ typedef struct call_data {
typedef struct channel_data {
grpc_mdstr *mdstr_request_compression_algorithm_key;
grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
grpc_mdstr *mdstr_compression_capabilities_key;
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
grpc_mdelem *mdelem_accept_encoding;
grpc_compression_algorithm default_compression_algorithm;
} channel_data;
@ -126,6 +132,10 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
break;
case GRPC_OP_METADATA:
if (!calld->seen_initial_metadata) {
grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->accept_encoding_storage,
grpc_mdelem_ref(channeld->mdelem_accept_encoding));
grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->compression_algorithm_storage,
grpc_mdelem_ref(channeld->mdelem_compression_algorithms
@ -173,6 +183,10 @@ static void finish_not_compressed_sopb(grpc_stream_op_buffer *send_ops,
break;
case GRPC_OP_METADATA:
if (!calld->seen_initial_metadata) {
grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->accept_encoding_storage,
grpc_mdelem_ref(channeld->mdelem_accept_encoding));
grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->compression_algorithm_storage,
grpc_mdelem_ref(
@ -295,6 +309,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const char* supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT-1];
char *accept_encoding_str;
size_t accept_encoding_str_len;
const grpc_compression_level clevel =
grpc_channel_args_get_compression_level(args);
@ -307,6 +324,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding");
channeld->mdstr_compression_capabilities_key =
grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorith_name;
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
@ -315,8 +335,24 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
mdctx,
grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorith_name));
if (algo_idx > 0) {
supported_algorithms_names[algo_idx-1] = algorith_name;
}
}
accept_encoding_str =
gpr_strjoin_sep(supported_algorithms_names,
GPR_ARRAY_SIZE(supported_algorithms_names),
", ",
&accept_encoding_str_len);
channeld->mdelem_accept_encoding =
grpc_mdelem_from_metadata_strings(
mdctx,
grpc_mdstr_ref(channeld->mdstr_compression_capabilities_key),
grpc_mdstr_from_string(mdctx, accept_encoding_str));
gpr_free(accept_encoding_str);
GPR_ASSERT(!is_last);
}
@ -327,10 +363,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
grpc_mdstr_unref(channeld->mdstr_compression_capabilities_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
}
grpc_mdelem_unref(channeld->mdelem_accept_encoding);
}
const grpc_channel_filter grpc_compress_filter = {compress_start_transport_stream_op,

@ -37,11 +37,15 @@
int grpc_compression_algorithm_parse(const char* name,
grpc_compression_algorithm *algorithm) {
if (strcmp(name, "none") == 0) {
/* we use strncmp not only because it's safer (even though in this case it
* doesn't matter, given that we are comparing against string literals, but
* because this way we needn't have "name" nil-terminated (useful for slice
* data, for example) */
if (strncmp(name, "none", 4) == 0) {
*algorithm = GRPC_COMPRESS_NONE;
} else if (strcmp(name, "gzip") == 0) {
} else if (strncmp(name, "gzip", 4) == 0) {
*algorithm = GRPC_COMPRESS_GZIP;
} else if (strcmp(name, "deflate") == 0) {
} else if (strncmp(name, "deflate", 7) == 0) {
*algorithm = GRPC_COMPRESS_DEFLATE;
} else {
return 0;

@ -225,6 +225,9 @@ struct grpc_call {
/* Compression algorithm for the call */
grpc_compression_algorithm compression_algorithm;
/* Supported encodings (compression algorithms) */
gpr_uint8 accept_encoding[GRPC_COMPRESS_ALGORITHMS_COUNT];
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@ -433,15 +436,37 @@ static void set_compression_algorithm(grpc_call *call,
call->compression_algorithm = algo;
}
static void set_accept_encoding(grpc_call *call,
const gpr_slice accept_encoding_slice) {
size_t i;
grpc_compression_algorithm algorithm;
gpr_slice_buffer accept_encoding_parts;
gpr_slice_buffer_init(&accept_encoding_parts);
gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts);
memset(call->accept_encoding, 0, sizeof(call->accept_encoding));
for (i = 0; i < accept_encoding_parts.count; i++) {
const gpr_slice* slice = &accept_encoding_parts.slices[i];
if (grpc_compression_algorithm_parse(
(const char *)GPR_SLICE_START_PTR(*slice), &algorithm)) {
call->accept_encoding[algorithm] = 1; /* GPR_TRUE */
} else {
/* TODO(dgq): it'd be nice to have a slice-to-cstr function to easily
* print the offending entry */
gpr_log(GPR_ERROR,
"Invalid entry in accept encoding metadata. Ignoring.");
}
}
}
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
grpc_mdstr_unref(call->status[source].details);
}
call->status[source].details = status;
}
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
} static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
if (set >= GRPC_IOREQ_OP_COUNT) return 0;
@ -1279,6 +1304,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key ==
grpc_channel_get_compression_algorithm_string(call->channel)) {
set_compression_algorithm(call, decode_compression(md));
} else if (key ==
grpc_channel_get_accept_encoding_string(call->channel)) {
set_accept_encoding(call, md->value->slice);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {

@ -64,6 +64,7 @@ struct grpc_channel {
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_compression_algorithm_string;
grpc_mdstr *grpc_accept_encoding_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@ -99,6 +100,8 @@ grpc_channel *grpc_channel_create_from_filters(
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding");
channel->grpc_accept_encoding_string =
grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
@ -209,6 +212,7 @@ static void destroy_channel(void *p, int ok) {
}
grpc_mdstr_unref(channel->grpc_status_string);
grpc_mdstr_unref(channel->grpc_compression_algorithm_string);
grpc_mdstr_unref(channel->grpc_accept_encoding_string);
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string);
@ -266,6 +270,11 @@ grpc_mdstr *grpc_channel_get_compression_algorithm_string(
return channel->grpc_compression_algorithm_string;
}
grpc_mdstr *grpc_channel_get_accept_encoding_string(
grpc_channel *channel) {
return channel->grpc_accept_encoding_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return grpc_mdelem_ref(channel->grpc_status_elem[i]);

@ -56,6 +56,8 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_accept_encoding_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);

Loading…
Cancel
Save