Server: added the ability to disable compression algorithm

pull/2533/head
David Garcia Quintas 9 years ago
parent 47245f509c
commit beac88ca56
  1. 3
      include/grpc++/server.h
  2. 23
      include/grpc++/server_builder.h
  3. 21
      include/grpc/compression.h
  4. 27
      src/core/channel/channel_args.c
  5. 15
      src/core/channel/channel_args.h
  6. 28
      src/core/channel/compress_filter.c
  7. 23
      src/core/compression/algorithm.c
  8. 25
      src/cpp/server/server.cc
  9. 53
      src/cpp/server/server_builder.cc

@ -43,6 +43,7 @@
#include <grpc++/impl/grpc_library.h> #include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/sync.h> #include <grpc++/impl/sync.h>
#include <grpc++/status.h> #include <grpc++/status.h>
#include <grpc/compression.h>
struct grpc_server; struct grpc_server;
@ -81,7 +82,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
// ServerBuilder use only // ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size); int max_message_size, grpc_compression_options compression_options);
// Register a service. This call does not take ownership of the service. // Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance. // The service must exist for the lifetime of the Server instance.
bool RegisterService(const grpc::string *host, RpcService* service); bool RegisterService(const grpc::string *host, RpcService* service);

@ -37,6 +37,7 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <grpc/compression.h>
#include <grpc++/config.h> #include <grpc++/config.h>
namespace grpc { namespace grpc {
@ -59,24 +60,24 @@ class ServerBuilder {
// The service must exist for the lifetime of the Server instance returned by // The service must exist for the lifetime of the Server instance returned by
// BuildAndStart(). // BuildAndStart().
// Matches requests with any :authority // Matches requests with any :authority
void RegisterService(SynchronousService* service); ServerBuilder& RegisterService(SynchronousService* service);
// Register an asynchronous service. // Register an asynchronous service.
// This call does not take ownership of the service or completion queue. // This call does not take ownership of the service or completion queue.
// The service and completion queuemust exist for the lifetime of the Server // The service and completion queuemust exist for the lifetime of the Server
// instance returned by BuildAndStart(). // instance returned by BuildAndStart().
// Matches requests with any :authority // Matches requests with any :authority
void RegisterAsyncService(AsynchronousService* service); ServerBuilder& RegisterAsyncService(AsynchronousService* service);
// Register a generic service. // Register a generic service.
// Matches requests with any :authority // Matches requests with any :authority
void RegisterAsyncGenericService(AsyncGenericService* service); ServerBuilder& RegisterAsyncGenericService(AsyncGenericService* service);
// Register a service. This call does not take ownership of the service. // Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance returned by // The service must exist for the lifetime of the Server instance returned by
// BuildAndStart(). // BuildAndStart().
// Only matches requests with :authority \a host // Only matches requests with :authority \a host
void RegisterService(const grpc::string& host, ServerBuilder& RegisterService(const grpc::string& host,
SynchronousService* service); SynchronousService* service);
// Register an asynchronous service. // Register an asynchronous service.
@ -84,22 +85,23 @@ class ServerBuilder {
// The service and completion queuemust exist for the lifetime of the Server // The service and completion queuemust exist for the lifetime of the Server
// instance returned by BuildAndStart(). // instance returned by BuildAndStart().
// Only matches requests with :authority \a host // Only matches requests with :authority \a host
void RegisterAsyncService(const grpc::string& host, ServerBuilder& RegisterAsyncService(const grpc::string& host,
AsynchronousService* service); AsynchronousService* service);
// Set max message size in bytes. // Set max message size in bytes.
void SetMaxMessageSize(int max_message_size) { ServerBuilder& SetMaxMessageSize(int max_message_size);
max_message_size_ = max_message_size;
}
// Add a listening port. Can be called multiple times. // Add a listening port. Can be called multiple times.
void AddListeningPort(const grpc::string& addr, ServerBuilder& AddListeningPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds, std::shared_ptr<ServerCredentials> creds,
int* selected_port = nullptr); int* selected_port = nullptr);
// Set the thread pool used for running appliation rpc handlers. // Set the thread pool used for running appliation rpc handlers.
// Does not take ownership. // Does not take ownership.
void SetThreadPool(ThreadPoolInterface* thread_pool); ServerBuilder& SetThreadPool(ThreadPoolInterface* thread_pool);
// Set the compression options to be used by the server.
ServerBuilder& SetCompressionOptions(const grpc_compression_options& options);
// Add a completion queue for handling asynchronous services // Add a completion queue for handling asynchronous services
// Caller is required to keep this completion queue live until calling // Caller is required to keep this completion queue live until calling
@ -126,6 +128,7 @@ class ServerBuilder {
}; };
int max_message_size_; int max_message_size_;
grpc_compression_options compression_options_;
std::vector<std::unique_ptr<NamedService<RpcService>>> services_; std::vector<std::unique_ptr<NamedService<RpcService>>> services_;
std::vector<std::unique_ptr<NamedService<AsynchronousService>>> async_services_; std::vector<std::unique_ptr<NamedService<AsynchronousService>>> async_services_;
std::vector<Port> ports_; std::vector<Port> ports_;

@ -36,6 +36,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -61,6 +63,11 @@ typedef enum {
GRPC_COMPRESS_LEVEL_COUNT GRPC_COMPRESS_LEVEL_COUNT
} grpc_compression_level; } grpc_compression_level;
typedef struct grpc_compression_options {
gpr_uint32 enabled_algorithms_bitset; /**< All algs are enabled by default */
grpc_compression_algorithm default_compression_algorithm; /**< for channel */
} grpc_compression_options;
/** Parses the first \a name_length bytes of \a name as a /** Parses the first \a name_length bytes of \a name as a
* grpc_compression_algorithm instance, updating \a algorithm. Returns 1 upon * grpc_compression_algorithm instance, updating \a algorithm. Returns 1 upon
* success, 0 otherwise. */ * success, 0 otherwise. */
@ -84,6 +91,20 @@ grpc_compression_level grpc_compression_level_for_algorithm(
grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_algorithm grpc_compression_algorithm_for_level(
grpc_compression_level level); grpc_compression_level level);
void grpc_compression_options_init(grpc_compression_options *opts);
/** Mark \a algorithm as enabled in \a opts. */
void grpc_compression_options_enable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm);
/** Mark \a algorithm as disabled in \a opts. */
void grpc_compression_options_disable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm);
/** Returns true if \a algorithm is marked as enabled in \a opts. */
int grpc_compression_options_is_algorithm_enabled(
const grpc_compression_options *opts, grpc_compression_algorithm algorithm);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

@ -148,16 +148,19 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
return grpc_channel_args_copy_and_add(a, &tmp, 1); return grpc_channel_args_copy_and_add(a, &tmp, 1);
} }
/** Returns the compression algorithm's enabled states bitset from \a a. If not
* found, return a biset will all algorithms enabled */
static gpr_uint32 find_compression_algorithm_states_bitset( static gpr_uint32 find_compression_algorithm_states_bitset(
const grpc_channel_args *a) { const grpc_channel_args *a) {
size_t i; gpr_uint32 states_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
gpr_uint32 states_bitset = 0; if (a != NULL) {
if (a == NULL) return 0; size_t i;
for (i = 0; i < a->num_args; ++i) { for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER && if (a->args[i].type == GRPC_ARG_INTEGER &&
!strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) { !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
states_bitset = a->args[i].value.integer; states_bitset = a->args[i].value.integer;
break; break;
}
} }
} }
return states_bitset; return states_bitset;
@ -182,9 +185,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
return grpc_channel_args_copy_and_add(a, &tmp, 1); return grpc_channel_args_copy_and_add(a, &tmp, 1);
} }
int grpc_channel_args_compression_algorithm_get_state( int grpc_channel_args_compression_algorithm_get_states(
grpc_channel_args *a, const grpc_channel_args *a) {
grpc_compression_algorithm algorithm) { return find_compression_algorithm_states_bitset(a);
const gpr_uint32 states_bitset = find_compression_algorithm_states_bitset(a);
return GPR_BITGET(states_bitset, algorithm);
} }

@ -68,17 +68,20 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm); grpc_channel_args *a, grpc_compression_algorithm algorithm);
/** Sets the support for the given compression algorithm. By default, all /** Sets the support for the given compression algorithm. By default, all
* compression algorithms are enabled. Disabling an algorithm set by * compression algorithms are enabled. It's an error to disable an algorithm set
* grpc_channel_args_set_compression_algorithm disables compression altogether * by grpc_channel_args_set_compression_algorithm.
* */ * */
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
grpc_channel_args *a, grpc_channel_args *a,
grpc_compression_algorithm algorithm, grpc_compression_algorithm algorithm,
int enabled); int enabled);
/** Returns the state (true for enabled, false for disabled) for \a algorithm */ /** Returns the bitset representing the support state (true for enabled, false
int grpc_channel_args_compression_algorithm_get_state( * for disabled) for compression algorithms.
grpc_channel_args *a, *
grpc_compression_algorithm algorithm); * The i-th bit of the returned bitset corresponds to the i-th entry in the
* grpc_compression_algorithm enum. */
int grpc_channel_args_compression_algorithm_get_states(
const grpc_channel_args *a);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */

@ -70,6 +70,8 @@ typedef struct channel_data {
grpc_mdelem *mdelem_accept_encoding; grpc_mdelem *mdelem_accept_encoding;
/** The default, channel-level, compression algorithm */ /** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm; grpc_compression_algorithm default_compression_algorithm;
/** Compression options for the channel */
grpc_compression_options compression_options;
} channel_data; } channel_data;
/** Compress \a slices in place using \a algorithm. Returns 1 if compression did /** Compress \a slices in place using \a algorithm. Returns 1 if compression did
@ -102,7 +104,17 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value); const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
&calld->compression_algorithm)) { &calld->compression_algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", gpr_log(GPR_ERROR,
"Invalid compression algorithm: '%s' (unknown). Ignoring.",
md_c_str);
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
if (grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, calld->compression_algorithm) == 0)
{
gpr_log(GPR_ERROR,
"Invalid compression algorithm: '%s' (previously disabled). "
"Ignoring.",
md_c_str); md_c_str);
calld->compression_algorithm = GRPC_COMPRESS_NONE; calld->compression_algorithm = GRPC_COMPRESS_NONE;
} }
@ -297,8 +309,17 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
char *accept_encoding_str; char *accept_encoding_str;
size_t accept_encoding_str_len; size_t accept_encoding_str_len;
grpc_compression_options_init(&channeld->compression_options);
channeld->compression_options.enabled_algorithms_bitset =
grpc_channel_args_compression_algorithm_get_states(args);
channeld->default_compression_algorithm = channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args); grpc_channel_args_get_compression_algorithm(args);
/* Make sure the default isn't disabled. */
GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, channeld->default_compression_algorithm));
channeld->compression_options.default_compression_algorithm =
channeld->default_compression_algorithm;
channeld->mdstr_request_compression_algorithm_key = channeld->mdstr_request_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0); grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, 0);
@ -311,6 +332,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorithm_name; char *algorithm_name;
/* skip disabled algorithms */
if (grpc_compression_options_is_algorithm_enabled(
&channeld->compression_options, algo_idx) == 0) {
continue;
}
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] = channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings( grpc_mdelem_from_metadata_strings(

@ -33,7 +33,9 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <grpc/compression.h> #include <grpc/compression.h>
#include <grpc/support/useful.h>
int grpc_compression_algorithm_parse(const char* name, size_t name_length, int grpc_compression_algorithm_parse(const char* name, size_t name_length,
grpc_compression_algorithm *algorithm) { grpc_compression_algorithm *algorithm) {
@ -102,3 +104,24 @@ grpc_compression_level grpc_compression_level_for_algorithm(
} }
abort(); abort();
} }
void grpc_compression_options_init(grpc_compression_options *opts) {
opts->enabled_algorithms_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT)-1;
opts->default_compression_algorithm = GRPC_COMPRESS_NONE;
}
void grpc_compression_options_enable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm) {
GPR_BITSET(&opts->enabled_algorithms_bitset, algorithm);
}
void grpc_compression_options_disable_algorithm(
grpc_compression_options *opts, grpc_compression_algorithm algorithm) {
GPR_BITCLEAR(&opts->enabled_algorithms_bitset, algorithm);
}
int grpc_compression_options_is_algorithm_enabled(
const grpc_compression_options *opts,
grpc_compression_algorithm algorithm) {
return GPR_BITGET(opts->enabled_algorithms_bitset, algorithm);
}

@ -163,27 +163,34 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_; grpc_completion_queue* cq_;
}; };
static grpc_server* CreateServer(int max_message_size) { static grpc_server* CreateServer(
int max_message_size, const grpc_compression_options& compression_options) {
if (max_message_size > 0) { if (max_message_size > 0) {
grpc_arg arg; grpc_arg args[2];
arg.type = GRPC_ARG_INTEGER; args[0].type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); args[0].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size; args[0].value.integer = max_message_size;
grpc_channel_args args = {1, &arg};
return grpc_server_create(&args); args[1].type = GRPC_ARG_INTEGER;
args[1].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG);
args[1].value.integer = compression_options.enabled_algorithms_bitset;
grpc_channel_args channel_args = {2, args};
return grpc_server_create(&channel_args);
} else { } else {
return grpc_server_create(nullptr); return grpc_server_create(nullptr);
} }
} }
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size) int max_message_size,
grpc_compression_options compression_options)
: max_message_size_(max_message_size), : max_message_size_(max_message_size),
started_(false), started_(false),
shutdown_(false), shutdown_(false),
num_running_cb_(0), num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>), sync_methods_(new std::list<SyncRequest>),
server_(CreateServer(max_message_size)), server_(CreateServer(max_message_size, compression_options)),
thread_pool_(thread_pool), thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) { thread_pool_owned_(thread_pool_owned) {
grpc_server_register_completion_queue(server_, cq_.cq()); grpc_server_register_completion_queue(server_, cq_.cq());

@ -42,7 +42,9 @@
namespace grpc { namespace grpc {
ServerBuilder::ServerBuilder() ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {} : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {
grpc_compression_options_init(&compression_options_);
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
ServerCompletionQueue* cq = new ServerCompletionQueue(); ServerCompletionQueue* cq = new ServerCompletionQueue();
@ -50,44 +52,65 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
return std::unique_ptr<ServerCompletionQueue>(cq); return std::unique_ptr<ServerCompletionQueue>(cq);
} }
void ServerBuilder::RegisterService(SynchronousService* service) { ServerBuilder& ServerBuilder::RegisterService(SynchronousService* service) {
services_.emplace_back(new NamedService<RpcService>(service->service())); services_.emplace_back(new NamedService<RpcService>(service->service()));
return *this;
} }
void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { ServerBuilder& ServerBuilder::RegisterAsyncService(
AsynchronousService* service) {
async_services_.emplace_back(new NamedService<AsynchronousService>(service)); async_services_.emplace_back(new NamedService<AsynchronousService>(service));
return *this;
} }
void ServerBuilder::RegisterService( ServerBuilder& ServerBuilder::RegisterService(
const grpc::string& addr, SynchronousService* service) { const grpc::string& addr, SynchronousService* service) {
services_.emplace_back(new NamedService<RpcService>(addr, service->service())); services_.emplace_back(new NamedService<RpcService>(addr, service->service()));
return *this;
} }
void ServerBuilder::RegisterAsyncService( ServerBuilder& ServerBuilder::RegisterAsyncService(
const grpc::string& addr, AsynchronousService* service) { const grpc::string& addr, AsynchronousService* service) {
async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service)); async_services_.emplace_back(
new NamedService<AsynchronousService>(addr, service));
return *this;
} }
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
AsyncGenericService* service) {
if (generic_service_) { if (generic_service_) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. " "Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p", "Dropping the service %p",
service); service);
return; } else {
generic_service_ = service;
} }
generic_service_ = service; return *this;
}
ServerBuilder& ServerBuilder::SetMaxMessageSize(int max_message_size) {
max_message_size_ = max_message_size;
return *this;
} }
void ServerBuilder::AddListeningPort(const grpc::string& addr, ServerBuilder& ServerBuilder::AddListeningPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds, std::shared_ptr<ServerCredentials> creds,
int* selected_port) { int* selected_port) {
Port port = {addr, creds, selected_port}; Port port = {addr, creds, selected_port};
ports_.push_back(port); ports_.push_back(port);
return *this;
} }
void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { ServerBuilder& ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) {
thread_pool_ = thread_pool; thread_pool_ = thread_pool;
return *this;
}
ServerBuilder& ServerBuilder::SetCompressionOptions(
const grpc_compression_options& options) {
compression_options_ = options;
return *this;
} }
std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
@ -100,8 +123,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool(); thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true; thread_pool_owned = true;
} }
std::unique_ptr<Server> server( std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned,
new Server(thread_pool_, thread_pool_owned, max_message_size_)); max_message_size_,
compression_options_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq()); grpc_server_register_completion_queue(server->server_, (*cq)->cq());
} }
@ -113,7 +137,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
} }
for (auto service = async_services_.begin(); for (auto service = async_services_.begin();
service != async_services_.end(); service++) { service != async_services_.end(); service++) {
if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) { if (!server->RegisterAsyncService((*service)->host.get(),
(*service)->service)) {
return nullptr; return nullptr;
} }
} }

Loading…
Cancel
Save