Merge pull request #2533 from dgquintas/compression-accept-encoding

Enable servers to disable compression algorithms
pull/3325/head
Yang Gao 9 years ago
commit 2df0f39553
  1. 3
      include/grpc++/server.h
  2. 7
      include/grpc++/server_builder.h
  3. 37
      src/core/channel/compress_filter.c
  4. 23
      src/core/compression/algorithm.c
  5. 2
      src/core/surface/call.c
  6. 30
      src/cpp/server/server.cc
  7. 9
      src/cpp/server/server_builder.cc

@ -37,6 +37,7 @@
#include <list> #include <list>
#include <memory> #include <memory>
#include <grpc/compression.h>
#include <grpc++/completion_queue.h> #include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h> #include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h> #include <grpc++/impl/grpc_library.h>
@ -99,7 +100,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// \param max_message_size Maximum message length that the channel can /// \param max_message_size Maximum message length that the channel can
/// receive. /// receive.
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size); int max_message_size, grpc_compression_options compression_options);
/// Register a service. This call does not take ownership of the service. /// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance. /// The service must exist for the lifetime of the Server instance.

@ -37,6 +37,7 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <grpc/compression.h>
#include <grpc++/support/config.h> #include <grpc++/support/config.h>
namespace grpc { namespace grpc {
@ -92,6 +93,11 @@ class ServerBuilder {
max_message_size_ = max_message_size; max_message_size_ = max_message_size;
} }
/// Set the compression options to be used by the server.
void SetCompressionOptions(const grpc_compression_options& options) {
compression_options_ = options;
}
/// Tries to bind \a server to the given \a addr. /// Tries to bind \a server to the given \a addr.
/// ///
/// It can be invoked multiple times. /// It can be invoked multiple times.
@ -133,6 +139,7 @@ class ServerBuilder {
}; };
int max_message_size_; int max_message_size_;
grpc_compression_options compression_options_;
std::vector<std::unique_ptr<NamedService<RpcService>>> services_; std::vector<std::unique_ptr<NamedService<RpcService>>> services_;
std::vector<std::unique_ptr<NamedService<AsynchronousService>>> std::vector<std::unique_ptr<NamedService<AsynchronousService>>>
async_services_; async_services_;

@ -70,6 +70,8 @@ typedef struct channel_data {
grpc_mdelem *mdelem_accept_encoding; grpc_mdelem *mdelem_accept_encoding;
/** The default, channel-level, compression algorithm */ /** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm; grpc_compression_algorithm default_compression_algorithm;
/** Compression options for the channel */
grpc_compression_options compression_options;
} channel_data; } channel_data;
/** Compress \a slices in place using \a algorithm. Returns 1 if compression did /** Compress \a slices in place using \a algorithm. Returns 1 if compression did
@ -102,7 +104,17 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value); const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
&calld->compression_algorithm)) { &calld->compression_algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", gpr_log(GPR_ERROR,
"Invalid compression algorithm: '%s' (unknown). Ignoring.",
md_c_str);
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
if (grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, calld->compression_algorithm) == 0)
{
gpr_log(GPR_ERROR,
"Invalid compression algorithm: '%s' (previously disabled). "
"Ignoring.",
md_c_str); md_c_str);
calld->compression_algorithm = GRPC_COMPRESS_NONE; calld->compression_algorithm = GRPC_COMPRESS_NONE;
} }
@ -294,11 +306,21 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx; grpc_compression_algorithm algo_idx;
const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
size_t supported_algorithms_idx = 0;
char *accept_encoding_str; char *accept_encoding_str;
size_t accept_encoding_str_len; size_t accept_encoding_str_len;
grpc_compression_options_init(&channeld->compression_options);
channeld->compression_options.enabled_algorithms_bitset =
grpc_channel_args_compression_algorithm_get_states(args);
channeld->default_compression_algorithm = channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args); grpc_channel_args_get_compression_algorithm(args);
/* Make sure the default isn't disabled. */
GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, channeld->default_compression_algorithm));
channeld->compression_options.default_compression_algorithm =
channeld->default_compression_algorithm;
channeld->mdstr_request_compression_algorithm_key = channeld->mdstr_request_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0);
@ -311,6 +333,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorithm_name; char *algorithm_name;
/* skip disabled algorithms */
if (grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, algo_idx) == 0) {
continue;
}
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] = channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings( grpc_mdelem_from_metadata_strings(
@ -318,15 +345,15 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0)); grpc_mdstr_from_string(mdctx, algorithm_name, 0));
if (algo_idx > 0) { if (algo_idx > 0) {
supported_algorithms_names[algo_idx - 1] = algorithm_name; supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
} }
} }
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
* arrays, as to avoid the heap allocs */ * arrays, as to avoid the heap allocs */
accept_encoding_str = gpr_strjoin_sep( accept_encoding_str =
supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names), gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",",
", ", &accept_encoding_str_len); &accept_encoding_str_len);
channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),

@ -33,7 +33,9 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <grpc/compression.h> #include <grpc/compression.h>
#include <grpc/support/useful.h>
int grpc_compression_algorithm_parse(const char *name, size_t name_length, int grpc_compression_algorithm_parse(const char *name, size_t name_length,
grpc_compression_algorithm *algorithm) { grpc_compression_algorithm *algorithm) {
@ -102,3 +104,24 @@ grpc_compression_level grpc_compression_level_for_algorithm(
} }
abort(); abort();
} }
void grpc_compression_options_init(grpc_compression_options *opts) {
opts->enabled_algorithms_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT)-1;
opts->default_compression_algorithm = GRPC_COMPRESS_NONE;
}
void grpc_compression_options_enable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm) {
GPR_BITSET(&opts->enabled_algorithms_bitset, algorithm);
}
void grpc_compression_options_disable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm) {
GPR_BITCLEAR(&opts->enabled_algorithms_bitset, algorithm);
}
int grpc_compression_options_is_algorithm_enabled(
const grpc_compression_options *opts,
grpc_compression_algorithm algorithm) {
return GPR_BITGET(opts->enabled_algorithms_bitset, algorithm);
}

@ -533,7 +533,7 @@ static void set_encodings_accepted_by_peer(
gpr_slice_buffer accept_encoding_parts; gpr_slice_buffer accept_encoding_parts;
gpr_slice_buffer_init(&accept_encoding_parts); gpr_slice_buffer_init(&accept_encoding_parts);
gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts); gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
/* No need to zero call->encodings_accepted_by_peer: grpc_call_create already /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
* zeroes the whole grpc_call */ * zeroes the whole grpc_call */

@ -252,28 +252,36 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_; grpc_completion_queue* cq_;
}; };
static grpc_server* CreateServer(int max_message_size) { static grpc_server* CreateServer(
int max_message_size, const grpc_compression_options& compression_options) {
grpc_arg args[2];
size_t args_idx = 0;
if (max_message_size > 0) { if (max_message_size > 0) {
grpc_arg arg; args[args_idx].type = GRPC_ARG_INTEGER;
arg.type = GRPC_ARG_INTEGER; args[args_idx].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); args[args_idx].value.integer = max_message_size;
arg.value.integer = max_message_size; args_idx++;
grpc_channel_args args = {1, &arg};
return grpc_server_create(&args, nullptr);
} else {
return grpc_server_create(nullptr, nullptr);
} }
args[args_idx].type = GRPC_ARG_INTEGER;
args[args_idx].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG);
args[args_idx].value.integer = compression_options.enabled_algorithms_bitset;
args_idx++;
grpc_channel_args channel_args = {args_idx, args};
return grpc_server_create(&channel_args, nullptr);
} }
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size) int max_message_size,
grpc_compression_options compression_options)
: max_message_size_(max_message_size), : max_message_size_(max_message_size),
started_(false), started_(false),
shutdown_(false), shutdown_(false),
num_running_cb_(0), num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>), sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false), has_generic_service_(false),
server_(CreateServer(max_message_size)), server_(CreateServer(max_message_size, compression_options)),
thread_pool_(thread_pool), thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) { thread_pool_owned_(thread_pool_owned) {
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);

@ -43,7 +43,9 @@
namespace grpc { namespace grpc {
ServerBuilder::ServerBuilder() ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {
grpc_compression_options_init(&compression_options_);
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
ServerCompletionQueue* cq = new ServerCompletionQueue(); ServerCompletionQueue* cq = new ServerCompletionQueue();
@ -99,8 +101,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool(); thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true; thread_pool_owned = true;
} }
std::unique_ptr<Server> server( std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned,
new Server(thread_pool_, thread_pool_owned, max_message_size_)); max_message_size_,
compression_options_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(), grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr); nullptr);

Loading…
Cancel
Save