diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 18a8017880c..210fe020ad1 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -37,6 +37,7 @@ #include #include +#include #include #include #include @@ -99,7 +100,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook { /// \param max_message_size Maximum message length that the channel can /// receive. Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_message_size); + int max_message_size, grpc_compression_options compression_options); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 496e7862c53..05937f150bd 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -37,6 +37,7 @@ #include #include +#include #include namespace grpc { @@ -92,6 +93,11 @@ class ServerBuilder { max_message_size_ = max_message_size; } + /// Set the compression options to be used by the server. + void SetCompressionOptions(const grpc_compression_options& options) { + compression_options_ = options; + } + /// Tries to bind \a server to the given \a addr. /// /// It can be invoked multiple times. @@ -133,6 +139,7 @@ class ServerBuilder { }; int max_message_size_; + grpc_compression_options compression_options_; std::vector>> services_; std::vector>> async_services_; diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 762a4edc73c..eb27252d033 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -70,6 +70,8 @@ typedef struct channel_data { grpc_mdelem *mdelem_accept_encoding; /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; + /** Compression options for the channel */ + grpc_compression_options compression_options; } channel_data; /** Compress \a slices in place using \a algorithm. Returns 1 if compression did @@ -102,7 +104,17 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), &calld->compression_algorithm)) { - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (unknown). Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, calld->compression_algorithm) == 0) + { + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (previously disabled). " + "Ignoring.", md_c_str); calld->compression_algorithm = GRPC_COMPRESS_NONE; } @@ -294,11 +306,21 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; + size_t supported_algorithms_idx = 0; char *accept_encoding_str; size_t accept_encoding_str_len; + grpc_compression_options_init(&channeld->compression_options); + channeld->compression_options.enabled_algorithms_bitset = + grpc_channel_args_compression_algorithm_get_states(args); + channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args); + /* Make sure the default isn't disabled. */ + GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, channeld->default_compression_algorithm)); + channeld->compression_options.default_compression_algorithm = + channeld->default_compression_algorithm; channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); @@ -311,6 +333,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { char *algorithm_name; + /* skip disabled algorithms */ + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, algo_idx) == 0) { + continue; + } GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( @@ -318,15 +345,15 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorithm_name, 0)); if (algo_idx > 0) { - supported_algorithms_names[algo_idx - 1] = algorithm_name; + supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; } } /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated * arrays, as to avoid the heap allocs */ - accept_encoding_str = gpr_strjoin_sep( - supported_algorithms_names, GPR_ARRAY_SIZE(supported_algorithms_names), - ", ", &accept_encoding_str_len); + accept_encoding_str = + gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",", + &accept_encoding_str_len); channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index 6ed6dbe93f2..76d42fde0f7 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -33,7 +33,9 @@ #include #include + #include +#include int grpc_compression_algorithm_parse(const char *name, size_t name_length, grpc_compression_algorithm *algorithm) { @@ -102,3 +104,24 @@ grpc_compression_level grpc_compression_level_for_algorithm( } abort(); } + +void grpc_compression_options_init(grpc_compression_options *opts) { + opts->enabled_algorithms_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT)-1; + opts->default_compression_algorithm = GRPC_COMPRESS_NONE; +} + +void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITSET(&opts->enabled_algorithms_bitset, algorithm); +} + +void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITCLEAR(&opts->enabled_algorithms_bitset, algorithm); +} + +int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) { + return GPR_BITGET(opts->enabled_algorithms_bitset, algorithm); +} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 6ece56371b4..40190c2f542 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -533,7 +533,7 @@ static void set_encodings_accepted_by_peer( gpr_slice_buffer accept_encoding_parts; gpr_slice_buffer_init(&accept_encoding_parts); - gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts); + gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already * zeroes the whole grpc_call */ diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index d67205e822f..a3020c342b4 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -252,28 +252,36 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -static grpc_server* CreateServer(int max_message_size) { +static grpc_server* CreateServer( + int max_message_size, const grpc_compression_options& compression_options) { + grpc_arg args[2]; + size_t args_idx = 0; if (max_message_size > 0) { - grpc_arg arg; - arg.type = GRPC_ARG_INTEGER; - arg.key = const_cast(GRPC_ARG_MAX_MESSAGE_LENGTH); - arg.value.integer = max_message_size; - grpc_channel_args args = {1, &arg}; - return grpc_server_create(&args, nullptr); - } else { - return grpc_server_create(nullptr, nullptr); + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast(GRPC_ARG_MAX_MESSAGE_LENGTH); + args[args_idx].value.integer = max_message_size; + args_idx++; } + + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast(GRPC_COMPRESSION_ALGORITHM_STATE_ARG); + args[args_idx].value.integer = compression_options.enabled_algorithms_bitset; + args_idx++; + + grpc_channel_args channel_args = {args_idx, args}; + return grpc_server_create(&channel_args, nullptr); } Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_message_size) + int max_message_size, + grpc_compression_options compression_options) : max_message_size_(max_message_size), started_(false), shutdown_(false), num_running_cb_(0), sync_methods_(new std::list), has_generic_service_(false), - server_(CreateServer(max_message_size)), + server_(CreateServer(max_message_size, compression_options)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 99bc8147a0d..1c7e4e4eb66 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -43,7 +43,9 @@ namespace grpc { ServerBuilder::ServerBuilder() - : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} + : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) { + grpc_compression_options_init(&compression_options_); +} std::unique_ptr ServerBuilder::AddCompletionQueue() { ServerCompletionQueue* cq = new ServerCompletionQueue(); @@ -99,8 +101,9 @@ std::unique_ptr ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } - std::unique_ptr server( - new Server(thread_pool_, thread_pool_owned, max_message_size_)); + std::unique_ptr server(new Server(thread_pool_, thread_pool_owned, + max_message_size_, + compression_options_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { grpc_server_register_completion_queue(server->server_, (*cq)->cq(), nullptr);