|
|
|
@ -35,22 +35,25 @@ |
|
|
|
|
#include <string.h> |
|
|
|
|
|
|
|
|
|
#include <grpc/compression.h> |
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/slice_buffer.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/channel/compress_filter.h" |
|
|
|
|
#include "src/core/channel/channel_args.h" |
|
|
|
|
#include "src/core/compression/message_compress.h" |
|
|
|
|
#include "src/core/support/string.h" |
|
|
|
|
|
|
|
|
|
typedef struct call_data { |
|
|
|
|
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ |
|
|
|
|
grpc_linked_mdelem compression_algorithm_storage; |
|
|
|
|
grpc_linked_mdelem accept_encoding_storage; |
|
|
|
|
int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ |
|
|
|
|
int written_initial_metadata; /**< Already processed initial md? */ |
|
|
|
|
/** Compression algorithm we'll try to use. It may be given by incoming
|
|
|
|
|
* metadata, or by the channel's default compression settings. */ |
|
|
|
|
grpc_compression_algorithm compression_algorithm; |
|
|
|
|
/** If true, contents of \a compression_algorithm are authoritative */ |
|
|
|
|
/** If true, contents of \a compression_algorithm are authoritative */ |
|
|
|
|
int has_compression_algorithm; |
|
|
|
|
} call_data; |
|
|
|
|
|
|
|
|
@ -59,8 +62,12 @@ typedef struct channel_data { |
|
|
|
|
grpc_mdstr *mdstr_request_compression_algorithm_key; |
|
|
|
|
/** Metadata key for the outgoing (used) compression algorithm */ |
|
|
|
|
grpc_mdstr *mdstr_outgoing_compression_algorithm_key; |
|
|
|
|
/** Metadata key for the accepted encodings */ |
|
|
|
|
grpc_mdstr *mdstr_compression_capabilities_key; |
|
|
|
|
/** Precomputed metadata elements for all available compression algorithms */ |
|
|
|
|
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT]; |
|
|
|
|
/** Precomputed metadata elements for the accepted encodings */ |
|
|
|
|
grpc_mdelem *mdelem_accept_encoding; |
|
|
|
|
/** The default, channel-level, compression algorithm */ |
|
|
|
|
grpc_compression_algorithm default_compression_algorithm; |
|
|
|
|
} channel_data; |
|
|
|
@ -71,7 +78,7 @@ typedef struct channel_data { |
|
|
|
|
* |
|
|
|
|
* Returns 1 if the data was actually compress and 0 otherwise. */ |
|
|
|
|
static int compress_send_sb(grpc_compression_algorithm algorithm, |
|
|
|
|
gpr_slice_buffer *slices) { |
|
|
|
|
gpr_slice_buffer *slices) { |
|
|
|
|
int did_compress; |
|
|
|
|
gpr_slice_buffer tmp; |
|
|
|
|
gpr_slice_buffer_init(&tmp); |
|
|
|
@ -86,14 +93,14 @@ static int compress_send_sb(grpc_compression_algorithm algorithm, |
|
|
|
|
/** For each \a md element from the incoming metadata, filter out the entry for
|
|
|
|
|
* "grpc-encoding", using its value to populate the call data's |
|
|
|
|
* compression_algorithm field. */ |
|
|
|
|
static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { |
|
|
|
|
static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { |
|
|
|
|
grpc_call_element *elem = user_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
channel_data *channeld = elem->channel_data; |
|
|
|
|
|
|
|
|
|
if (md->key == channeld->mdstr_request_compression_algorithm_key) { |
|
|
|
|
const char *md_c_str = grpc_mdstr_as_c_string(md->value); |
|
|
|
|
if (!grpc_compression_algorithm_parse(md_c_str, |
|
|
|
|
if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), |
|
|
|
|
&calld->compression_algorithm)) { |
|
|
|
|
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", |
|
|
|
|
md_c_str); |
|
|
|
@ -108,10 +115,10 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { |
|
|
|
|
|
|
|
|
|
static int skip_compression(channel_data *channeld, call_data *calld) { |
|
|
|
|
if (calld->has_compression_algorithm) { |
|
|
|
|
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { |
|
|
|
|
return 1; |
|
|
|
|
} |
|
|
|
|
return 0; /* we have an actual call-specific algorithm */ |
|
|
|
|
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { |
|
|
|
|
return 1; |
|
|
|
|
} |
|
|
|
|
return 0; /* we have an actual call-specific algorithm */ |
|
|
|
|
} |
|
|
|
|
/* no per-call compression override */ |
|
|
|
|
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; |
|
|
|
@ -184,7 +191,7 @@ static void process_send_ops(grpc_call_element *elem, |
|
|
|
|
* given by GRPC_OP_BEGIN_MESSAGE) */ |
|
|
|
|
calld->remaining_slice_bytes = sop->data.begin_message.length; |
|
|
|
|
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { |
|
|
|
|
calld->has_compression_algorithm = 1; /* GPR_TRUE */ |
|
|
|
|
calld->has_compression_algorithm = 1; /* GPR_TRUE */ |
|
|
|
|
calld->compression_algorithm = GRPC_COMPRESS_NONE; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
@ -202,10 +209,17 @@ static void process_send_ops(grpc_call_element *elem, |
|
|
|
|
channeld->default_compression_algorithm; |
|
|
|
|
calld->has_compression_algorithm = 1; /* GPR_TRUE */ |
|
|
|
|
} |
|
|
|
|
/* hint compression algorithm */ |
|
|
|
|
grpc_metadata_batch_add_tail( |
|
|
|
|
&(sop->data.metadata), &calld->compression_algorithm_storage, |
|
|
|
|
GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms |
|
|
|
|
[calld->compression_algorithm])); |
|
|
|
|
|
|
|
|
|
/* convey supported compression algorithms */ |
|
|
|
|
grpc_metadata_batch_add_head( |
|
|
|
|
&(sop->data.metadata), &calld->accept_encoding_storage, |
|
|
|
|
GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); |
|
|
|
|
|
|
|
|
|
calld->written_initial_metadata = 1; /* GPR_TRUE */ |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
@ -279,6 +293,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, |
|
|
|
|
int is_first, int is_last) { |
|
|
|
|
channel_data *channeld = elem->channel_data; |
|
|
|
|
grpc_compression_algorithm algo_idx; |
|
|
|
|
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; |
|
|
|
|
char *accept_encoding_str; |
|
|
|
|
size_t accept_encoding_str_len; |
|
|
|
|
|
|
|
|
|
channeld->default_compression_algorithm = |
|
|
|
|
grpc_channel_args_get_compression_algorithm(args); |
|
|
|
@ -289,6 +306,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, |
|
|
|
|
channeld->mdstr_outgoing_compression_algorithm_key = |
|
|
|
|
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); |
|
|
|
|
|
|
|
|
|
channeld->mdstr_compression_capabilities_key = |
|
|
|
|
grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0); |
|
|
|
|
|
|
|
|
|
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { |
|
|
|
|
char *algorithm_name; |
|
|
|
|
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); |
|
|
|
@ -297,8 +317,22 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, |
|
|
|
|
mdctx, |
|
|
|
|
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), |
|
|
|
|
grpc_mdstr_from_string(mdctx, algorithm_name, 0)); |
|
|
|
|
if (algo_idx > 0) { |
|
|
|
|
supported_algorithms_names[algo_idx - 1] = algorithm_name; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
|
|
|
|
|
* arrays, as to avoid the heap allocs */ |
|
|
|
|
accept_encoding_str = gpr_strjoin_sep( |
|
|
|
|
supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names), |
|
|
|
|
", ", &accept_encoding_str_len); |
|
|
|
|
|
|
|
|
|
channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( |
|
|
|
|
mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), |
|
|
|
|
grpc_mdstr_from_string(mdctx, accept_encoding_str, 0)); |
|
|
|
|
gpr_free(accept_encoding_str); |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(!is_last); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -309,10 +343,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { |
|
|
|
|
|
|
|
|
|
GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); |
|
|
|
|
GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); |
|
|
|
|
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; |
|
|
|
|
++algo_idx) { |
|
|
|
|
GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key); |
|
|
|
|
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { |
|
|
|
|
GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); |
|
|
|
|
} |
|
|
|
|
GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const grpc_channel_filter grpc_compress_filter = { |
|
|
|
|