diff --git a/BUILD b/BUILD index f630336bb49..6ae1246d906 100644 --- a/BUILD +++ b/BUILD @@ -315,7 +315,7 @@ cc_library( "src/core/lib/channel/connected_channel.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", - "src/core/lib/compression/compression_algorithm.c", + "src/core/lib/compression/compression.c", "src/core/lib/compression/message_compress.c", "src/core/lib/debug/trace.c", "src/core/lib/http/format_request.c", @@ -681,7 +681,7 @@ cc_library( "src/core/lib/channel/connected_channel.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", - "src/core/lib/compression/compression_algorithm.c", + "src/core/lib/compression/compression.c", "src/core/lib/compression/message_compress.c", "src/core/lib/debug/trace.c", "src/core/lib/http/format_request.c", @@ -1010,7 +1010,7 @@ cc_library( "src/core/lib/channel/connected_channel.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", - "src/core/lib/compression/compression_algorithm.c", + "src/core/lib/compression/compression.c", "src/core/lib/compression/message_compress.c", "src/core/lib/debug/trace.c", "src/core/lib/http/format_request.c", @@ -1708,7 +1708,7 @@ objc_library( "src/core/lib/channel/connected_channel.c", "src/core/lib/channel/http_client_filter.c", "src/core/lib/channel/http_server_filter.c", - "src/core/lib/compression/compression_algorithm.c", + "src/core/lib/compression/compression.c", "src/core/lib/compression/message_compress.c", "src/core/lib/debug/trace.c", "src/core/lib/http/format_request.c", diff --git a/Makefile b/Makefile index a545b527173..db98f3f94ce 100644 --- a/Makefile +++ b/Makefile @@ -2487,7 +2487,7 @@ LIBGRPC_SRC = \ src/core/lib/channel/connected_channel.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ - src/core/lib/compression/compression_algorithm.c \ + src/core/lib/compression/compression.c \ src/core/lib/compression/message_compress.c \ src/core/lib/debug/trace.c \ src/core/lib/http/format_request.c \ @@ -2753,7 +2753,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/channel/connected_channel.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ - src/core/lib/compression/compression_algorithm.c \ + src/core/lib/compression/compression.c \ src/core/lib/compression/message_compress.c \ src/core/lib/debug/trace.c \ src/core/lib/http/format_request.c \ @@ -3088,7 +3088,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/channel/connected_channel.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ - src/core/lib/compression/compression_algorithm.c \ + src/core/lib/compression/compression.c \ src/core/lib/compression/message_compress.c \ src/core/lib/debug/trace.c \ src/core/lib/http/format_request.c \ diff --git a/binding.gyp b/binding.gyp index 80077a99bd8..9b4ae5eb427 100644 --- a/binding.gyp +++ b/binding.gyp @@ -571,7 +571,7 @@ 'src/core/lib/channel/connected_channel.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', - 'src/core/lib/compression/compression_algorithm.c', + 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', 'src/core/lib/debug/trace.c', 'src/core/lib/http/format_request.c', diff --git a/build.yaml b/build.yaml index 96157cbdcc9..a94fb052a5d 100644 --- a/build.yaml +++ b/build.yaml @@ -240,7 +240,7 @@ filegroups: - src/core/lib/channel/connected_channel.c - src/core/lib/channel/http_client_filter.c - src/core/lib/channel/http_server_filter.c - - src/core/lib/compression/compression_algorithm.c + - src/core/lib/compression/compression.c - src/core/lib/compression/message_compress.c - src/core/lib/debug/trace.c - src/core/lib/http/format_request.c diff --git a/config.m4 b/config.m4 index a760d982d13..cd653d3151e 100644 --- a/config.m4 +++ b/config.m4 @@ -90,7 +90,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/channel/connected_channel.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ - src/core/lib/compression/compression_algorithm.c \ + src/core/lib/compression/compression.c \ src/core/lib/compression/message_compress.c \ src/core/lib/debug/trace.c \ src/core/lib/http/format_request.c \ diff --git a/gRPC.podspec b/gRPC.podspec index 2266e17205e..10a836eb73b 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -352,7 +352,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/connected_channel.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', - 'src/core/lib/compression/compression_algorithm.c', + 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', 'src/core/lib/debug/trace.c', 'src/core/lib/http/format_request.c', diff --git a/grpc.gemspec b/grpc.gemspec index 406e0477b92..24d512d152c 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -331,7 +331,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/channel/connected_channel.c ) s.files += %w( src/core/lib/channel/http_client_filter.c ) s.files += %w( src/core/lib/channel/http_server_filter.c ) - s.files += %w( src/core/lib/compression/compression_algorithm.c ) + s.files += %w( src/core/lib/compression/compression.c ) s.files += %w( src/core/lib/compression/message_compress.c ) s.files += %w( src/core/lib/debug/trace.c ) s.files += %w( src/core/lib/http/format_request.c ) diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 4f550b42a2d..d720f27a8fa 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -47,7 +47,9 @@ #include <grpc++/impl/codegen/serialization_traits.h> #include <grpc++/impl/codegen/status.h> #include <grpc++/impl/codegen/string_ref.h> + #include <grpc/impl/codegen/alloc.h> +#include <grpc/impl/codegen/compression_types.h> #include <grpc/impl/codegen/grpc_types.h> struct grpc_byte_buffer; @@ -187,6 +189,8 @@ class CallOpSendInitialMetadata { flags_ = flags; initial_metadata_count_ = metadata.size(); initial_metadata_ = FillMetadataArray(metadata); + // TODO(dgq): expose compression level in API so it can be properly set. + maybe_compression_level_.is_set = false; } protected: @@ -198,6 +202,10 @@ class CallOpSendInitialMetadata { op->reserved = NULL; op->data.send_initial_metadata.count = initial_metadata_count_; op->data.send_initial_metadata.metadata = initial_metadata_; + op->data.send_initial_metadata.maybe_compression_level.is_set = + maybe_compression_level_.is_set; + op->data.send_initial_metadata.maybe_compression_level.level = + maybe_compression_level_.level; } void FinishOp(bool* status, int max_message_size) { if (!send_) return; @@ -209,6 +217,10 @@ class CallOpSendInitialMetadata { uint32_t flags_; size_t initial_metadata_count_; grpc_metadata* initial_metadata_; + struct { + bool is_set; + grpc_compression_level level; + } maybe_compression_level_; }; class CallOpSendMessage { diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 8525cb70cbb..54f01d11b5d 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -66,29 +66,43 @@ class ServerBuilder { /// The service must exist for the lifetime of the \a Server instance returned /// by \a BuildAndStart(). /// Matches requests with any :authority - void RegisterService(Service* service); + ServerBuilder& RegisterService(Service* service); /// Register a generic service. /// Matches requests with any :authority - void RegisterAsyncGenericService(AsyncGenericService* service); + ServerBuilder& RegisterAsyncGenericService(AsyncGenericService* service); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the \a Server instance returned /// by BuildAndStart(). /// Only matches requests with :authority \a host - void RegisterService(const grpc::string& host, Service* service); + ServerBuilder& RegisterService(const grpc::string& host, Service* service); /// Set max message size in bytes. - void SetMaxMessageSize(int max_message_size) { + ServerBuilder& SetMaxMessageSize(int max_message_size) { max_message_size_ = max_message_size; + return *this; } - /// Set the compression options to be used by the server. - void SetCompressionOptions(const grpc_compression_options& options) { - compression_options_ = options; - } + /// Set the support status for compression algorithms. All algorithms are + /// enabled by default. + /// + /// Incoming calls compressed with an unsupported algorithm will fail with + /// GRPC_STATUS_UNIMPLEMENTED. + ServerBuilder& SetCompressionAlgorithmSupportStatus( + grpc_compression_algorithm algorithm, bool enabled); + + /// The default compression level to use for all channel calls in the + /// absence of a call-specific level. + ServerBuilder& SetDefaultCompressionLevel(grpc_compression_level level); + + /// The default compression algorithm to use for all channel calls in the + /// absence of a call-specific level. Note that it overrides any compression + /// level set by \a SetDefaultCompressionLevel. + ServerBuilder& SetDefaultCompressionAlgorithm( + grpc_compression_algorithm algorithm); - void SetOption(std::unique_ptr<ServerBuilderOption> option); + ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option); /// Tries to bind \a server to the given \a addr. /// @@ -101,9 +115,9 @@ class ServerBuilder { /// number. \a nullptr otherwise. /// // TODO(dgq): the "port" part seems to be a misnomer. - void AddListeningPort(const grpc::string& addr, - std::shared_ptr<ServerCredentials> creds, - int* selected_port = nullptr); + ServerBuilder& AddListeningPort(const grpc::string& addr, + std::shared_ptr<ServerCredentials> creds, + int* selected_port = nullptr); /// Add a completion queue for handling asynchronous services /// Caller is required to keep this completion queue live until @@ -144,7 +158,6 @@ class ServerBuilder { }; int max_message_size_; - grpc_compression_options compression_options_; std::vector<std::unique_ptr<ServerBuilderOption>> options_; std::vector<std::unique_ptr<NamedService>> services_; std::vector<Port> ports_; @@ -152,6 +165,15 @@ class ServerBuilder { std::shared_ptr<ServerCredentials> creds_; std::map<grpc::string, std::unique_ptr<ServerBuilderPlugin>> plugins_; AsyncGenericService* generic_service_; + struct { + bool is_set; + grpc_compression_level level; + } maybe_default_compression_level_; + struct { + bool is_set; + grpc_compression_algorithm algorithm; + } maybe_default_compression_algorithm_; + uint32_t enabled_compression_algorithms_bitset_; }; } // namespace grpc diff --git a/include/grpc/compression.h b/include/grpc/compression.h index 8de4b133d4c..22bcf0e3020 100644 --- a/include/grpc/compression.h +++ b/include/grpc/compression.h @@ -51,7 +51,8 @@ GRPCAPI int grpc_compression_algorithm_parse( grpc_compression_algorithm *algorithm); /** Updates \a name with the encoding name corresponding to a valid \a - * algorithm. Returns 1 upon success, 0 otherwise. */ + * algorithm. Note that \a name is statically allocated and must *not* be freed. + * Returns 1 upon success, 0 otherwise. */ GRPCAPI int grpc_compression_algorithm_name( grpc_compression_algorithm algorithm, char **name); diff --git a/include/grpc/impl/codegen/compression_types.h b/include/grpc/impl/codegen/compression_types.h index 8d2ec3b9d71..9065d1edd02 100644 --- a/include/grpc/impl/codegen/compression_types.h +++ b/include/grpc/impl/codegen/compression_types.h @@ -35,11 +35,17 @@ #define GRPC_IMPL_CODEGEN_COMPRESSION_TYPES_H #include <grpc/impl/codegen/port_platform.h> +#include <stdbool.h> #ifdef __cplusplus extern "C" { #endif +/** To be used as initial metadata key for the request of a concrete compression + * algorithm */ +#define GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY \ + "grpc-internal-encoding-request" + /** To be used in channel arguments */ #define GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM \ "grpc.default_compression_algorithm" @@ -74,15 +80,24 @@ typedef struct grpc_compression_options { */ uint32_t enabled_algorithms_bitset; - /** The default channel compression algorithm. It'll be used in the absence of + /** The default channel compression level. It'll be used in the absence of * call specific settings. This option corresponds to the channel argument key - * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM */ - grpc_compression_algorithm default_compression_algorithm; + * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL. If present, takes + * precedence over \a default_algorithm. + * TODO(dgq): currently only available for server channels. */ + struct { + bool is_set; + grpc_compression_level level; + } default_level; - /** The default channel compression level. It'll be used in the absence of + /** The default channel compression algorithm. It'll be used in the absence of * call specific settings. This option corresponds to the channel argument key - * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL */ - grpc_compression_algorithm default_compression_level; + * behind \a GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM. */ + struct { + bool is_set; + grpc_compression_algorithm algorithm; + } default_algorithm; + } grpc_compression_options; #ifdef __cplusplus diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 7181be4a34d..c51ffa44930 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -334,6 +334,12 @@ typedef struct grpc_op { struct { size_t count; grpc_metadata *metadata; + /** If \a is_set, \a compression_level will be used for the call. + * Otherwise, \a compression_level won't be considered */ + struct { + uint8_t is_set; + grpc_compression_level level; + } maybe_compression_level; } send_initial_metadata; grpc_byte_buffer *send_message; struct { diff --git a/package.xml b/package.xml index edd81623499..9fa63e30909 100644 --- a/package.xml +++ b/package.xml @@ -338,7 +338,7 @@ <file baseinstalldir="/" name="src/core/lib/channel/connected_channel.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.c" role="src" /> - <file baseinstalldir="/" name="src/core/lib/compression/compression_algorithm.c" role="src" /> + <file baseinstalldir="/" name="src/core/lib/compression/compression.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/compression/message_compress.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/debug/trace.c" role="src" /> <file baseinstalldir="/" name="src/core/lib/http/format_request.c" role="src" /> diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 569be4dc282..d53ce904a99 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -35,6 +35,7 @@ #include <grpc/grpc.h> #include "src/core/lib/support/string.h" +#include <grpc/compression.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -181,6 +182,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( grpc_channel_args *grpc_channel_args_set_compression_algorithm( grpc_channel_args *a, grpc_compression_algorithm algorithm) { + GPR_ASSERT(algorithm < GRPC_COMPRESS_ALGORITHMS_COUNT); grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM; @@ -200,7 +202,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a, !strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, a->args[i].key)) { *states_arg = &a->args[i].value.integer; - return 1; /* GPR_TRUE */ + **states_arg |= 0x1; /* forcefully enable support for no compression */ + return 1; } } } @@ -214,10 +217,18 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( const int states_arg_found = find_compression_algorithm_states_bitset(*a, &states_arg); - if (states_arg_found) { + if (grpc_channel_args_get_compression_algorithm(*a) == algorithm && + state == 0) { + char *algo_name = NULL; + GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name) != 0); + gpr_log(GPR_ERROR, + "Tried to disable default compression algorithm '%s'. The " + "operation has been ignored.", + algo_name); + } else if (states_arg_found) { if (state != 0) { GPR_BITSET((unsigned *)states_arg, algorithm); - } else { + } else if (algorithm != GRPC_COMPRESS_NONE) { GPR_BITCLEAR((unsigned *)states_arg, algorithm); } } else { @@ -229,7 +240,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; if (state != 0) { GPR_BITSET((unsigned *)&tmp.value.integer, algorithm); - } else { + } else if (algorithm != GRPC_COMPRESS_NONE) { GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); } result = grpc_channel_args_copy_and_add(*a, &tmp, 1); @@ -239,11 +250,11 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( return result; } -int grpc_channel_args_compression_algorithm_get_states( +uint32_t grpc_channel_args_compression_algorithm_get_states( const grpc_channel_args *a) { int *states_arg; if (find_compression_algorithm_states_bitset(a, &states_arg)) { - return *states_arg; + return (uint32_t)*states_arg; } else { return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ } diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 23c7b7b897b..653d04f4279 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -81,7 +81,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( * * The i-th bit of the returned bitset corresponds to the i-th entry in the * grpc_compression_algorithm enum. */ -int grpc_channel_args_compression_algorithm_get_states( +uint32_t grpc_channel_args_compression_algorithm_get_states( const grpc_channel_args *a); int grpc_channel_args_compare(const grpc_channel_args *a, diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 7c8c1d6f31a..4fe9a7f045a 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -73,8 +73,8 @@ typedef struct call_data { typedef struct channel_data { /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; - /** Compression options for the channel */ - grpc_compression_options compression_options; + /** Bitset of enabled algorithms */ + uint32_t enabled_algorithms_bitset; /** Supported compression algorithms */ uint32_t supported_compression_algorithms; } channel_data; @@ -96,9 +96,8 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { md_c_str); calld->compression_algorithm = GRPC_COMPRESS_NONE; } - if (grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, calld->compression_algorithm) == - 0) { + if (!GPR_BITGET(channeld->enabled_algorithms_bitset, + calld->compression_algorithm)) { gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s' (previously disabled). " "Ignoring.", @@ -282,32 +281,26 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { channel_data *channeld = elem->channel_data; - grpc_compression_algorithm algo_idx; - grpc_compression_options_init(&channeld->compression_options); - channeld->compression_options.enabled_algorithms_bitset = - (uint32_t)grpc_channel_args_compression_algorithm_get_states( - args->channel_args); + channeld->enabled_algorithms_bitset = + grpc_channel_args_compression_algorithm_get_states(args->channel_args); channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args->channel_args); /* Make sure the default isn't disabled. */ - if (!grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, - channeld->default_compression_algorithm)) { + if (!GPR_BITGET(channeld->enabled_algorithms_bitset, + channeld->default_compression_algorithm)) { gpr_log(GPR_DEBUG, "compression algorithm %d not enabled: switching to none", channeld->default_compression_algorithm); channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; } - channeld->compression_options.default_compression_algorithm = - channeld->default_compression_algorithm; - channeld->supported_compression_algorithms = 0; - for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { + channeld->supported_compression_algorithms = 1; /* always support identity */ + for (grpc_compression_algorithm algo_idx = 1; + algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { /* skip disabled algorithms */ - if (grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, algo_idx) == 0) { + if (!GPR_BITGET(channeld->enabled_algorithms_bitset, algo_idx)) { continue; } channeld->supported_compression_algorithms |= 1u << algo_idx; diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h index 0ce5d08837d..e4a2a829d59 100644 --- a/src/core/lib/channel/compress_filter.h +++ b/src/core/lib/channel/compress_filter.h @@ -34,9 +34,9 @@ #ifndef GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H #define GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H -#include "src/core/lib/channel/channel_stack.h" +#include <grpc/impl/codegen/compression_types.h> -#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" +#include "src/core/lib/channel/channel_stack.h" extern int grpc_compression_trace; @@ -48,7 +48,7 @@ extern int grpc_compression_trace; * - Channel configuration, as established at channel creation time. * - The metadata accompanying the outgoing data to be compressed. This is * taken as a request only. We may choose not to honor it. The metadata key - * is given by \a GRPC_COMPRESS_REQUEST_ALGORITHM_KEY. + * is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY. * * Compression can be disabled for concrete messages (for instance in order to * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in diff --git a/src/core/lib/compression/compression_algorithm.c b/src/core/lib/compression/compression.c similarity index 98% rename from src/core/lib/compression/compression_algorithm.c rename to src/core/lib/compression/compression.c index 820871d579b..54efb5e855c 100644 --- a/src/core/lib/compression/compression_algorithm.c +++ b/src/core/lib/compression/compression.c @@ -125,6 +125,28 @@ grpc_mdelem *grpc_compression_encoding_mdelem( return NULL; } +void grpc_compression_options_init(grpc_compression_options *opts) { + memset(opts, 0, sizeof(*opts)); + /* all enabled by default */ + opts->enabled_algorithms_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; +} + +void grpc_compression_options_enable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITSET(&opts->enabled_algorithms_bitset, algorithm); +} + +void grpc_compression_options_disable_algorithm( + grpc_compression_options *opts, grpc_compression_algorithm algorithm) { + GPR_BITCLEAR(&opts->enabled_algorithms_bitset, algorithm); +} + +int grpc_compression_options_is_algorithm_enabled( + const grpc_compression_options *opts, + grpc_compression_algorithm algorithm) { + return GPR_BITGET(opts->enabled_algorithms_bitset, algorithm); +} + /* TODO(dgq): Add the ability to specify parameters to the individual * compression algorithms */ grpc_compression_algorithm grpc_compression_algorithm_for_level( @@ -180,25 +202,3 @@ grpc_compression_algorithm grpc_compression_algorithm_for_level( abort(); }; } - -void grpc_compression_options_init(grpc_compression_options *opts) { - opts->enabled_algorithms_bitset = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; - opts->default_compression_algorithm = GRPC_COMPRESS_NONE; -} - -void grpc_compression_options_enable_algorithm( - grpc_compression_options *opts, grpc_compression_algorithm algorithm) { - GPR_BITSET(&opts->enabled_algorithms_bitset, algorithm); -} - -void grpc_compression_options_disable_algorithm( - grpc_compression_options *opts, grpc_compression_algorithm algorithm) { - GPR_BITCLEAR(&opts->enabled_algorithms_bitset, algorithm); -} - -int grpc_compression_options_is_algorithm_enabled( - const grpc_compression_options *opts, - grpc_compression_algorithm algorithm) { - if (algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT) return 0; - return GPR_BITGET(opts->enabled_algorithms_bitset, algorithm); -} diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index e899bc8098b..a9b1e25a771 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -40,6 +40,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/slice.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -52,7 +53,9 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" +#include "src/core/lib/transport/transport.h" /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch @@ -154,8 +157,8 @@ struct grpc_call { /* Call stats: only valid after trailing metadata received */ grpc_call_stats stats; - /* Compression algorithm for the call */ - grpc_compression_algorithm compression_algorithm; + /* Compression algorithm for *incoming* data */ + grpc_compression_algorithm incoming_compression_algorithm; /* Supported encodings (compression algorithms), a bitset */ uint32_t encodings_accepted_by_peer; @@ -214,6 +217,9 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, bool success); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, @@ -399,21 +405,27 @@ static void set_status_code(grpc_call *call, status_source source, /* TODO(ctiller): what to do about the flush that was previously here */ } -static void set_compression_algorithm(grpc_call *call, - grpc_compression_algorithm algo) { +static void set_incoming_compression_algorithm( + grpc_call *call, grpc_compression_algorithm algo) { GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); - call->compression_algorithm = algo; + call->incoming_compression_algorithm = algo; } grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call *call) { grpc_compression_algorithm algorithm; gpr_mu_lock(&call->mu); - algorithm = call->compression_algorithm; + algorithm = call->incoming_compression_algorithm; gpr_mu_unlock(&call->mu); return algorithm; } +static grpc_compression_algorithm compression_algorithm_for_level_locked( + grpc_call *call, grpc_compression_level level) { + return grpc_compression_algorithm_for_level(level, + call->encodings_accepted_by_peer); +} + uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { uint32_t flags; gpr_mu_lock(&call->mu); @@ -539,15 +551,28 @@ static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } +static grpc_metadata *get_md_elem(grpc_metadata *metadata, + grpc_metadata *additional_metadata, int i, + int count) { + grpc_metadata *res = + i < count ? &metadata[i] : &additional_metadata[i - count]; + GPR_ASSERT(res); + return res; +} + static int prepare_application_metadata(grpc_call *call, int count, grpc_metadata *metadata, int is_trailing, - int prepend_extra_metadata) { + int prepend_extra_metadata, + grpc_metadata *additional_metadata, + int additional_metadata_count) { + int total_count = count + additional_metadata_count; int i; grpc_metadata_batch *batch = &call->metadata_batch[0 /* is_receiving */][is_trailing]; - for (i = 0; i < count; i++) { - grpc_metadata *md = &metadata[i]; + for (i = 0; i < total_count; i++) { + const grpc_metadata *md = + get_md_elem(metadata, additional_metadata, i, count); grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); l->md = grpc_mdelem_from_string_and_buffer( @@ -566,9 +591,10 @@ static int prepare_application_metadata(grpc_call *call, int count, break; } } - if (i != count) { + if (i != total_count) { for (int j = 0; j <= i; j++) { - grpc_metadata *md = &metadata[j]; + const grpc_metadata *md = + get_md_elem(metadata, additional_metadata, j, count); grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GRPC_MDELEM_UNREF(l->md); } @@ -589,24 +615,36 @@ static int prepare_application_metadata(grpc_call *call, int count, } } } - for (i = 1; i < count; i++) { - linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]); + for (i = 1; i < total_count; i++) { + grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); + grpc_metadata *prev_md = + get_md_elem(metadata, additional_metadata, i - 1, count); + linked_from_md(md)->prev = linked_from_md(prev_md); } - for (i = 0; i < count - 1; i++) { - linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]); + for (i = 0; i < total_count - 1; i++) { + grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); + grpc_metadata *next_md = + get_md_elem(metadata, additional_metadata, i + 1, count); + linked_from_md(md)->next = linked_from_md(next_md); } - switch (prepend_extra_metadata * 2 + (count != 0)) { + + switch (prepend_extra_metadata * 2 + (total_count != 0)) { case 0: /* no prepend, no metadata => nothing to do */ batch->list.head = batch->list.tail = NULL; break; - case 1: + case 1: { /* metadata, but no prepend */ - batch->list.head = linked_from_md(&metadata[0]); - batch->list.tail = linked_from_md(&metadata[count - 1]); + grpc_metadata *first_md = + get_md_elem(metadata, additional_metadata, 0, count); + grpc_metadata *last_md = + get_md_elem(metadata, additional_metadata, total_count - 1, count); + batch->list.head = linked_from_md(first_md); + batch->list.tail = linked_from_md(last_md); batch->list.head->prev = NULL; batch->list.tail->next = NULL; break; + } case 2: /* prepend, but no md */ batch->list.head = &call->send_extra_metadata[0]; @@ -615,17 +653,22 @@ static int prepare_application_metadata(grpc_call *call, int count, batch->list.head->prev = NULL; batch->list.tail->next = NULL; break; - case 3: + case 3: { /* prepend AND md */ + grpc_metadata *first_md = + get_md_elem(metadata, additional_metadata, 0, count); + grpc_metadata *last_md = + get_md_elem(metadata, additional_metadata, total_count - 1, count); batch->list.head = &call->send_extra_metadata[0]; call->send_extra_metadata[call->send_extra_metadata_count - 1].next = - linked_from_md(&metadata[0]); - linked_from_md(&metadata[0])->prev = + linked_from_md(first_md); + linked_from_md(first_md)->prev = &call->send_extra_metadata[call->send_extra_metadata_count - 1]; - batch->list.tail = linked_from_md(&metadata[count - 1]); + batch->list.tail = linked_from_md(last_md); batch->list.head->prev = NULL; batch->list.tail->next = NULL; break; + } default: GPR_UNREACHABLE_CODE(return 0); } @@ -694,48 +737,102 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return r; } -typedef struct cancel_closure { +typedef struct termination_closure { grpc_closure closure; grpc_call *call; grpc_status_code status; -} cancel_closure; + gpr_slice optional_message; + grpc_closure *op_closure; + enum { TC_CANCEL, TC_CLOSE } type; +} termination_closure; -static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { - cancel_closure *cc = ccp; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel"); - gpr_free(cc); +static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { + termination_closure *tc = tcp; + if (tc->type == TC_CANCEL) { + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel"); + } + if (tc->type == TC_CLOSE) { + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); + } + gpr_slice_unref(tc->optional_message); + if (tc->op_closure != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, tc->op_closure, true, NULL); + } + gpr_free(tc); } -static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) { +static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { grpc_transport_stream_op op; - cancel_closure *cc = ccp; + termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - op.cancel_with_status = cc->status; + op.cancel_with_status = tc->status; /* reuse closure to catch completion */ - grpc_closure_init(&cc->closure, done_cancel, cc); - op.on_complete = &cc->closure; - execute_op(exec_ctx, cc->call, &op); + grpc_closure_init(&tc->closure, done_termination, tc); + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); +} + +static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, bool success) { + grpc_transport_stream_op op; + termination_closure *tc = tcp; + memset(&op, 0, sizeof(op)); + tc->optional_message = gpr_slice_ref(tc->optional_message); + grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message); + /* reuse closure to catch completion */ + grpc_closure_init(&tc->closure, done_termination, tc); + tc->op_closure = op.on_complete; + op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &op); +} + +static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, + termination_closure *tc) { + grpc_mdstr *details = NULL; + if (GPR_SLICE_LENGTH(tc->optional_message) > 0) { + tc->optional_message = gpr_slice_ref(tc->optional_message); + details = grpc_mdstr_from_slice(tc->optional_message); + } + + set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status); + set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details); + + if (tc->type == TC_CANCEL) { + grpc_closure_init(&tc->closure, send_cancel, tc); + GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); + } else if (tc->type == TC_CLOSE) { + grpc_closure_init(&tc->closure, send_close, tc); + GRPC_CALL_INTERNAL_REF(tc->call, "close"); + } + grpc_exec_ctx_enqueue(exec_ctx, &tc->closure, true, NULL); + return GRPC_CALL_OK; } static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { - grpc_mdstr *details = - description ? grpc_mdstr_from_string(description) : NULL; - cancel_closure *cc = gpr_malloc(sizeof(*cc)); - + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CANCEL; + tc->call = c; + tc->optional_message = gpr_slice_from_copied_string(description); GPR_ASSERT(status != GRPC_STATUS_OK); + tc->status = status; - set_status_code(c, STATUS_FROM_API_OVERRIDE, (uint32_t)status); - set_status_details(c, STATUS_FROM_API_OVERRIDE, details); + return terminate_with_status(exec_ctx, tc); +} - grpc_closure_init(&cc->closure, send_cancel, cc); - cc->call = c; - cc->status = status; - GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL); +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description) { + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CLOSE; + tc->call = c; + tc->optional_message = gpr_slice_from_copied_string(description); + GPR_ASSERT(status != GRPC_STATUS_OK); + tc->status = status; - return GRPC_CALL_OK; + return terminate_with_status(exec_ctx, tc); } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, @@ -876,9 +973,9 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) { if (elem == NULL) { return NULL; } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { - GPR_TIMER_BEGIN("compression_algorithm", 0); - set_compression_algorithm(call, decode_compression(elem)); - GPR_TIMER_END("compression_algorithm", 0); + GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); + set_incoming_compression_algorithm(call, decode_compression(elem)); + GPR_TIMER_END("incoming_compression_algorithm", 0); return NULL; } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); @@ -1041,9 +1138,9 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, } else { call->test_only_last_message_flags = call->receiving_stream->flags; if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) { *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( - NULL, 0, call->compression_algorithm); + NULL, 0, call->incoming_compression_algorithm); } else { *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); } @@ -1071,6 +1168,56 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } +static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { + grpc_call *call = bctl->call; + /* validate call->incoming_compression_algorithm */ + if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) { + const grpc_compression_algorithm algo = + call->incoming_compression_algorithm; + char *error_msg = NULL; + const grpc_compression_options compression_options = + grpc_channel_compression_options(call->channel); + /* check if algorithm is known */ + if (algo >= GRPC_COMPRESS_ALGORITHMS_COUNT) { + gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", + algo); + gpr_log(GPR_ERROR, error_msg); + close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + } else if (grpc_compression_options_is_algorithm_enabled( + &compression_options, algo) == 0) { + /* check if algorithm is supported by current channel config */ + char *algo_name; + grpc_compression_algorithm_name(algo, &algo_name); + gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", + algo_name); + gpr_log(GPR_ERROR, error_msg); + close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + } else { + call->incoming_compression_algorithm = algo; + } + gpr_free(error_msg); + } + + /* make sure the received grpc-encoding is amongst the ones listed in + * grpc-accept-encoding */ + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->incoming_compression_algorithm)) { + extern int grpc_compression_trace; + if (grpc_compression_trace) { + char *algo_name; + grpc_compression_algorithm_name(call->incoming_compression_algorithm, + &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } + } +} + static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; @@ -1085,24 +1232,10 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_initial_filter, call); - /* make sure the received grpc-encoding is amongst the ones listed in - * grpc-accept-encoding */ - - GPR_ASSERT(call->encodings_accepted_by_peer != 0); - if (!GPR_BITGET(call->encodings_accepted_by_peer, - call->compression_algorithm)) { - extern int grpc_compression_trace; - if (grpc_compression_trace) { - char *algo_name; - grpc_compression_algorithm_name(call->compression_algorithm, - &algo_name); - gpr_log(GPR_ERROR, - "Compression algorithm (grpc-encoding = '%s') not present in " - "the bitset of accepted encodings (grpc-accept-encodings: " - "'0x%x')", - algo_name, call->encodings_accepted_by_peer); - } - } + GPR_TIMER_BEGIN("validate_filtered_metadata", 0); + validate_filtered_metadata(exec_ctx, bctl); + GPR_TIMER_END("validate_filtered_metadata", 0); + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { @@ -1245,7 +1378,40 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - if (op->data.send_initial_metadata.count > INT_MAX) { + /* process compression level */ + grpc_metadata compression_md; + memset(&compression_md, 0, sizeof(grpc_metadata)); + size_t additional_metadata_count = 0; + grpc_compression_level effective_compression_level; + bool level_set = false; + if (op->data.send_initial_metadata.maybe_compression_level.is_set) { + effective_compression_level = + op->data.send_initial_metadata.maybe_compression_level.level; + level_set = true; + } else { + const grpc_compression_options copts = + grpc_channel_compression_options(call->channel); + level_set = copts.default_level.is_set; + if (level_set) { + effective_compression_level = copts.default_level.level; + } + } + if (level_set && !call->is_client) { + const grpc_compression_algorithm calgo = + compression_algorithm_for_level_locked( + call, effective_compression_level); + char *calgo_name; + grpc_compression_algorithm_name(calgo, &calgo_name); + // the following will be picked up by the compress filter and used as + // the call's compression algorithm. + compression_md.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY; + compression_md.value = calgo_name; + compression_md.value_length = strlen(calgo_name); + additional_metadata_count++; + } + + if (op->data.send_initial_metadata.count + additional_metadata_count > + INT_MAX) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } @@ -1253,7 +1419,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->sent_initial_metadata = 1; if (!prepare_application_metadata( call, (int)op->data.send_initial_metadata.count, - op->data.send_initial_metadata.metadata, 0, call->is_client)) { + op->data.send_initial_metadata.metadata, 0, call->is_client, + &compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } @@ -1341,7 +1508,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (!prepare_application_metadata( call, (int)op->data.send_status_from_server.trailing_metadata_count, - op->data.send_status_from_server.trailing_metadata, 1, 1)) { + op->data.send_status_from_server.trailing_metadata, 1, 1, NULL, + 0)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } @@ -1530,9 +1698,10 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; } grpc_compression_algorithm grpc_call_compression_for_level( grpc_call *call, grpc_compression_level level) { gpr_mu_lock(&call->mu); - const uint32_t accepted_encodings = call->encodings_accepted_by_peer; + grpc_compression_algorithm algo = + compression_algorithm_for_level_locked(call, level); gpr_mu_unlock(&call->mu); - return grpc_compression_algorithm_for_level(level, accepted_encodings); + return algo; } const char *grpc_call_error_to_string(grpc_call_error error) { diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index a7ea6fa1f08..f0b3c2e15d1 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -36,16 +36,17 @@ #include <stdlib.h> #include <string.h> +#include <grpc/compression.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" -#include "src/core/lib/surface/init.h" #include "src/core/lib/transport/static_metadata.h" /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. @@ -64,10 +65,12 @@ typedef struct registered_call { struct grpc_channel { int is_client; uint32_t max_message_length; + grpc_compression_options compression_options; grpc_mdelem *default_authority; gpr_mu registered_call_mu; registered_call *registered_calls; + char *target; }; @@ -111,6 +114,7 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, channel->registered_calls = NULL; channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + grpc_compression_options_init(&channel->compression_options); if (args) { for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { @@ -151,6 +155,27 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, ":authority", args->args[i].value.string); } } + } else if (0 == strcmp(args->args[i].key, + GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) { + channel->compression_options.default_level.is_set = true; + GPR_ASSERT(args->args[i].value.integer >= 0 && + args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT); + channel->compression_options.default_level.level = + (grpc_compression_level)args->args[i].value.integer; + } else if (0 == strcmp(args->args[i].key, + GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { + channel->compression_options.default_algorithm.is_set = true; + GPR_ASSERT(args->args[i].value.integer >= 0 && + args->args[i].value.integer < + GRPC_COMPRESS_ALGORITHMS_COUNT); + channel->compression_options.default_algorithm.algorithm = + (grpc_compression_algorithm)args->args[i].value.integer; + } else if (0 == + strcmp(args->args[i].key, + GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { + channel->compression_options.enabled_algorithms_bitset = + (uint32_t)args->args[i].value.integer | + 0x1; /* always support no compression */ } } grpc_channel_args_destroy(args); @@ -324,6 +349,11 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } +grpc_compression_options grpc_channel_compression_options( + const grpc_channel *channel) { + return channel->compression_options; +} + grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { char tmp[GPR_LTOA_MIN_BUFSIZE]; switch (i) { diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index ff3debc31f2..7eff7b88836 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -76,4 +76,8 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel_internal_unref(exec_ctx, channel) #endif +/** Return the channel's compression options. */ +grpc_compression_options grpc_channel_compression_options( + const grpc_channel *channel); + #endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_H */ diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 32c7794ade7..d3e5ce0c4a6 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -33,15 +33,14 @@ #include <grpc++/client_context.h> -#include <grpc++/security/credentials.h> -#include <grpc++/server_context.h> -#include <grpc++/support/time.h> #include <grpc/compression.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -#include "src/core/lib/channel/compress_filter.h" +#include <grpc++/security/credentials.h> +#include <grpc++/server_context.h> +#include <grpc++/support/time.h> namespace grpc { @@ -112,7 +111,7 @@ void ClientContext::set_compression_algorithm( abort(); } GPR_ASSERT(algorithm_name != nullptr); - AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name); + AddMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name); } void ClientContext::TryCancel() { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 54feac39825..279981744ad 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -37,6 +37,8 @@ #include <grpc++/server.h> #include <grpc/support/cpu.h> #include <grpc/support/log.h> + +#include "include/grpc/support/useful.h" #include "src/cpp/server/thread_pool_interface.h" namespace grpc { @@ -52,12 +54,18 @@ static void do_plugin_list_init(void) { ServerBuilder::ServerBuilder() : max_message_size_(-1), generic_service_(nullptr) { - grpc_compression_options_init(&compression_options_); gpr_once_init(&once_init_plugin_list, do_plugin_list_init); for (auto factory : (*g_plugin_factory_list)) { std::unique_ptr<ServerBuilderPlugin> plugin = factory(); plugins_[plugin->name()] = std::move(plugin); } + // all compression algorithms enabled by default. + enabled_compression_algorithms_bitset_ = + (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; + memset(&maybe_default_compression_level_, 0, + sizeof(maybe_default_compression_level_)); + memset(&maybe_default_compression_algorithm_, 0, + sizeof(maybe_default_compression_algorithm_)); } std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( @@ -67,35 +75,65 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( return std::unique_ptr<ServerCompletionQueue>(cq); } -void ServerBuilder::RegisterService(Service* service) { +ServerBuilder& ServerBuilder::RegisterService(Service* service) { services_.emplace_back(new NamedService(service)); + return *this; } -void ServerBuilder::RegisterService(const grpc::string& addr, - Service* service) { +ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr, + Service* service) { services_.emplace_back(new NamedService(addr, service)); + return *this; } -void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { +ServerBuilder& ServerBuilder::RegisterAsyncGenericService( + AsyncGenericService* service) { if (generic_service_) { gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", service); - return; + } else { + generic_service_ = service; } - generic_service_ = service; + return *this; } -void ServerBuilder::SetOption(std::unique_ptr<ServerBuilderOption> option) { +ServerBuilder& ServerBuilder::SetOption( + std::unique_ptr<ServerBuilderOption> option) { options_.push_back(std::move(option)); + return *this; +} + +ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus( + grpc_compression_algorithm algorithm, bool enabled) { + if (enabled) { + GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm); + } else { + GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm); + } + return *this; } -void ServerBuilder::AddListeningPort(const grpc::string& addr, - std::shared_ptr<ServerCredentials> creds, - int* selected_port) { +ServerBuilder& ServerBuilder::SetDefaultCompressionLevel( + grpc_compression_level level) { + maybe_default_compression_level_.level = level; + return *this; +} + +ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm( + grpc_compression_algorithm algorithm) { + maybe_default_compression_algorithm_.is_set = true; + maybe_default_compression_algorithm_.algorithm = algorithm; + return *this; +} + +ServerBuilder& ServerBuilder::AddListeningPort( + const grpc::string& addr, std::shared_ptr<ServerCredentials> creds, + int* selected_port) { Port port = {addr, creds, selected_port}; ports_.push_back(port); + return *this; } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { @@ -128,7 +166,15 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_); } args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, - compression_options_.enabled_algorithms_bitset); + enabled_compression_algorithms_bitset_); + if (maybe_default_compression_level_.is_set) { + args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL, + maybe_default_compression_level_.level); + } + if (maybe_default_compression_algorithm_.is_set) { + args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, + maybe_default_compression_algorithm_.algorithm); + } std::unique_ptr<Server> server( new Server(thread_pool.release(), true, max_message_size_, &args)); ServerInitializer* initializer = server->initializer(); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 204fef1b09e..43117fd1e95 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -42,7 +42,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/surface/call.h" namespace grpc { @@ -196,6 +195,9 @@ bool ServerContext::IsCancelled() const { } void ServerContext::set_compression_level(grpc_compression_level level) { + // TODO(dgq): get rid of grpc_call_compression_for_level and propagate the + // compression level by adding a new argument to + // CallOpSendInitialMetadata::SendInitialMetadata. const grpc_compression_algorithm algorithm_for_level = grpc_call_compression_for_level(call_, level); set_compression_algorithm(algorithm_for_level); @@ -210,7 +212,7 @@ void ServerContext::set_compression_algorithm( abort(); } GPR_ASSERT(algorithm_name != NULL); - AddInitialMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name); + AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name); } grpc::string ServerContext::peer() const { diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 2a243b94cbd..4782654250c 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -503,6 +503,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, grpc_metadata_array *initial_metadata, uint32_t write_flags) { /* TODO: don't use magic number */ grpc_op ops[6]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -555,6 +556,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[4]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -596,6 +598,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( size_t send_buffer_len, grpc_metadata_array *initial_metadata, uint32_t write_flags) { /* TODO: don't use magic number */ grpc_op ops[4]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -638,6 +641,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -684,6 +688,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, int32_t send_empty_initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); size_t nops = send_empty_initial_metadata ? 2 : 1; ops[0].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); @@ -691,8 +696,6 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, ops[0].flags = write_flags; ops[0].reserved = NULL; ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[1].data.send_initial_metadata.count = 0; - ops[1].data.send_initial_metadata.metadata = NULL; ops[1].flags = 0; ops[1].reserved = NULL; @@ -719,6 +722,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( size_t optional_send_buffer_len, uint32_t write_flags) { /* TODO: don't use magic number */ grpc_op ops[3]; + memset(ops, 0, sizeof(ops)); size_t nops = 1; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; @@ -743,8 +747,6 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( } if (send_empty_initial_metadata) { ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[nops].data.send_initial_metadata.count = 0; - ops[nops].data.send_initial_metadata.metadata = NULL; ops[nops].flags = 0; ops[nops].reserved = NULL; nops++; @@ -784,6 +786,7 @@ grpcsharp_call_send_initial_metadata(grpc_call *call, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 16e5bff7ff7..a3fa5938cdd 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -72,6 +72,8 @@ _op.op = GRPC_OP_SEND_INITIAL_METADATA; _op.data.send_initial_metadata.count = metadata.count; _op.data.send_initial_metadata.metadata = metadata.grpc_metadataArray; + _op.data.send_initial_metadata.maybe_compression_level.is_set = false; + _op.data.send_initial_metadata.maybe_compression_level.level = 0; _handler = handler; } return self; diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 884130e7d4f..0ec502262de 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -292,6 +292,7 @@ PHP_METHOD(Call, startBatch) { grpc_metadata_array_init(&recv_trailing_metadata); MAKE_STD_ZVAL(result); object_init(result); + memset(ops, 0, sizeof(ops)); /* "a" == 1 array */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &array) == FAILURE) { diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 0e4d962154f..1a04021c270 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -84,7 +84,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/connected_channel.c', 'src/core/lib/channel/http_client_filter.c', 'src/core/lib/channel/http_server_filter.c', - 'src/core/lib/compression/compression_algorithm.c', + 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', 'src/core/lib/debug/trace.c', 'src/core/lib/http/format_request.c', diff --git a/test/core/bad_client/tests/large_metadata.c b/test/core/bad_client/tests/large_metadata.c index b7d329cb741..ded5f17d4ab 100644 --- a/test/core/bad_client/tests/large_metadata.c +++ b/test/core/bad_client/tests/large_metadata.c @@ -163,6 +163,7 @@ static void server_verifier_sends_too_much_metadata(grpc_server *server, meta.value_length = metadata_value_size; grpc_op op; + memset(&op, 0, sizeof(op)); op.op = GRPC_OP_SEND_INITIAL_METADATA; op.data.send_initial_metadata.count = 1; op.data.send_initial_metadata.metadata = &meta; diff --git a/test/core/bad_ssl/bad_ssl_test.c b/test/core/bad_ssl/bad_ssl_test.c index c4ae212ec29..bb06ab0bb96 100644 --- a/test/core/bad_ssl/bad_ssl_test.c +++ b/test/core/bad_ssl/bad_ssl_test.c @@ -84,6 +84,7 @@ static void run_test(const char *target, size_t nops) { "/foo", "foo.test.google.fr:1234", deadline, NULL); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/channel/channel_args_test.c b/test/core/channel/channel_args_test.c index c2fc05095aa..8ef1bff22e7 100644 --- a/test/core/channel/channel_args_test.c +++ b/test/core/channel/channel_args_test.c @@ -136,8 +136,10 @@ static void test_compression_algorithm_states(void) { int main(int argc, char **argv) { grpc_test_init(argc, argv); + grpc_init(); test_create(); test_set_compression_algorithm(); test_compression_algorithm_states(); + grpc_shutdown(); return 0; } diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c index 786f0b39b27..1534360dea0 100644 --- a/test/core/client_config/lb_policies_test.c +++ b/test/core/client_config/lb_policies_test.c @@ -275,6 +275,7 @@ static int *perform_request(servers_fixture *f, grpc_channel *client, GPR_ASSERT(c); completed_client = 0; + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -327,6 +328,7 @@ static int *perform_request(servers_fixture *f, grpc_channel *client, } if (s_idx >= 0) { + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -415,6 +417,7 @@ static grpc_call **perform_multirequest(servers_fixture *f, kill_server(f, i); } + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index 5390c4ab986..438b9492369 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -136,6 +136,7 @@ static void start_rpc(int use_creds, int target_port) { state.call = grpc_channel_create_call( state.channel, NULL, GRPC_PROPAGATE_DEFAULTS, state.cq, "/Service/Method", "localhost", gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + memset(&state.op, 0, sizeof(state.op)); state.op.op = GRPC_OP_SEND_INITIAL_METADATA; state.op.data.send_initial_metadata.count = 0; state.op.flags = 0; diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index 77afe588d79..8e9fa70b0ec 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -284,6 +284,6 @@ static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) { return e; } -void cq_expect_completion(cq_verifier *v, void *tag, int success) { +void cq_expect_completion(cq_verifier *v, void *tag, bool success) { add(v, GRPC_OP_COMPLETE, tag)->success = success; } diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index b3e07c45a58..8c9a85c2187 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -34,6 +34,8 @@ #ifndef GRPC_TEST_CORE_END2END_CQ_VERIFIER_H #define GRPC_TEST_CORE_END2END_CQ_VERIFIER_H +#include <stdbool.h> + #include <grpc/grpc.h> #include "test/core/util/test_config.h" @@ -57,7 +59,7 @@ void cq_verify_empty(cq_verifier *v); Any functions taking ... expect a NULL terminated list of key/value pairs (each pair using two parameter slots) of metadata that MUST be present in the event. */ -void cq_expect_completion(cq_verifier *v, void *tag, int success); +void cq_expect_completion(cq_verifier *v, void *tag, bool success); int byte_buffer_eq_string(grpc_byte_buffer *byte_buffer, const char *string); int contains_metadata(grpc_metadata_array *array, const char *key, diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 202fb3b6a86..46e060e4fe9 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -167,6 +167,7 @@ void test_connect(const char *server_host, const char *client_host, int port, "/foo", "foo.test.google.fr", deadline, NULL); GPR_ASSERT(c); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -201,6 +202,7 @@ void test_connect(const char *server_host, const char *client_host, int port, cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/fixtures/h2_ssl_cert.c b/test/core/end2end/fixtures/h2_ssl_cert.c index 0fa525175ba..b476bf5516d 100644 --- a/test/core/end2end/fixtures/h2_ssl_cert.c +++ b/test/core/end2end/fixtures/h2_ssl_cert.c @@ -321,6 +321,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, NULL); GPR_ASSERT(c); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/fixtures/proxy.c b/test/core/end2end/fixtures/proxy.c index 7f10649fa69..beed80df819 100644 --- a/test/core/end2end/fixtures/proxy.c +++ b/test/core/end2end/fixtures/proxy.c @@ -170,6 +170,7 @@ static void on_p2s_recv_initial_metadata(void *arg, int success) { grpc_op op; grpc_call_error err; + memset(&op, 0, sizeof(op)); if (!pc->proxy->shutdown) { op.op = GRPC_OP_SEND_INITIAL_METADATA; op.flags = 0; @@ -329,6 +330,7 @@ static void on_new_call(void *arg, int success) { if (success) { grpc_op op; + memset(&op, 0, sizeof(op)); proxy_call *pc = gpr_malloc(sizeof(*pc)); memset(pc, 0, sizeof(*pc)); pc->proxy = proxy; diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index 6bcddbd7699..3cae0bd7782 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -739,6 +739,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { break; } grpc_op *ops = gpr_malloc(sizeof(grpc_op) * num_ops); + memset(ops, 0, sizeof(grpc_op) * num_ops); bool ok = true; size_t i; grpc_op *op; diff --git a/test/core/end2end/fuzzers/client_fuzzer.c b/test/core/end2end/fuzzers/client_fuzzer.c index afcf7638f72..5612b6621a5 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.c +++ b/test/core/end2end/fuzzers/client_fuzzer.c @@ -31,6 +31,8 @@ * */ +#include <string.h> + #include <grpc/grpc.h> #include <grpc/support/alloc.h> @@ -78,6 +80,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { size_t details_capacity = 0; grpc_op ops[6]; + memset(ops, 0, sizeof(ops)); grpc_op *op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/goaway_server_test.c b/test/core/end2end/goaway_server_test.c index 5f8c2641e76..3266793137c 100644 --- a/test/core/end2end/goaway_server_test.c +++ b/test/core/end2end/goaway_server_test.c @@ -132,6 +132,7 @@ int main(int argc, char **argv) { chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq, "/foo", "127.0.0.1", GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20), NULL); /* send initial metadata to probe connectivity */ + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -142,6 +143,7 @@ int main(int argc, char **argv) { (size_t)(op - ops), tag(0x101), NULL)); /* and receive status to probe termination */ + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1; @@ -183,6 +185,7 @@ int main(int argc, char **argv) { tag(0x9999)); /* listen for close on the server call to probe for finishing */ + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled1; @@ -205,6 +208,7 @@ int main(int argc, char **argv) { chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq, "/foo", "127.0.0.1", GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20), NULL); /* send initial metadata to probe connectivity */ + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -215,6 +219,7 @@ int main(int argc, char **argv) { (size_t)(op - ops), tag(0x201), NULL)); /* and receive status to probe termination */ + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2; @@ -249,6 +254,7 @@ int main(int argc, char **argv) { cq_verify(cqv); /* listen for close on the server call to probe for finishing */ + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled2; diff --git a/test/core/end2end/invalid_call_argument_test.c b/test/core/end2end/invalid_call_argument_test.c index cf42e92a1a3..35456607012 100644 --- a/test/core/end2end/invalid_call_argument_test.c +++ b/test/core/end2end/invalid_call_argument_test.c @@ -30,11 +30,15 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ + +#include <limits.h> +#include <string.h> + #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> -#include <limits.h> + #include "test/core/end2end/cq_verifier.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -74,6 +78,7 @@ static void prepare_test(int is_client) { g_state.cqv = cq_verifier_create(g_state.cq); g_state.details = NULL; g_state.details_capacity = 0; + memset(g_state.ops, 0, sizeof(g_state.ops)); if (is_client) { /* create a call, channel to a non existant server */ diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c index 7a5cd2335f1..08af3821971 100644 --- a/test/core/end2end/no_server_test.c +++ b/test/core/end2end/no_server_test.c @@ -31,9 +31,12 @@ * */ +#include <string.h> + #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> + #include "test/core/end2end/cq_verifier.h" #include "test/core/util/test_config.h" @@ -65,6 +68,7 @@ int main(int argc, char **argv) { call = grpc_channel_create_call(chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq, "/Foo", "nonexistant", deadline, NULL); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/bad_hostname.c b/test/core/end2end/tests/bad_hostname.c index 3cb9b3d4b12..c9663c2155e 100644 --- a/test/core/end2end/tests/bad_hostname.c +++ b/test/core/end2end/tests/bad_hostname.c @@ -123,6 +123,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/binary_metadata.c b/test/core/end2end/tests/binary_metadata.c index 994c3bf1dd2..3dd26120777 100644 --- a/test/core/end2end/tests/binary_metadata.c +++ b/test/core/end2end/tests/binary_metadata.c @@ -157,6 +157,7 @@ static void test_request_response_with_metadata_and_payload( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; @@ -201,6 +202,7 @@ static void test_request_response_with_metadata_and_payload( cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; @@ -219,6 +221,7 @@ static void test_request_response_with_metadata_and_payload( cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/end2end/tests/call_creds.c b/test/core/end2end/tests/call_creds.c index 5c6791f6f78..694a0aa9ef4 100644 --- a/test/core/end2end/tests/call_creds.c +++ b/test/core/end2end/tests/call_creds.c @@ -193,6 +193,7 @@ static void request_response_with_payload_and_call_creds( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -248,6 +249,7 @@ static void request_response_with_payload_and_call_creds( /* Cannot set creds on the server call object. */ GPR_ASSERT(grpc_call_set_credentials(s, NULL) != GRPC_CALL_OK); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -265,6 +267,7 @@ static void request_response_with_payload_and_call_creds( cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; @@ -410,6 +413,7 @@ static void test_request_with_server_rejecting_client_creds( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index fc2a64a6c18..51c13da3b1e 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -136,6 +136,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; @@ -174,6 +175,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, cq_expect_completion(cqv, tag(2), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; diff --git a/test/core/end2end/tests/cancel_after_client_done.c b/test/core/end2end/tests/cancel_after_client_done.c index 3bafa8c85fd..2b5a409deef 100644 --- a/test/core/end2end/tests/cancel_after_client_done.c +++ b/test/core/end2end/tests/cancel_after_client_done.c @@ -136,6 +136,7 @@ static void test_cancel_after_accept_and_writes_closed( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; @@ -178,6 +179,7 @@ static void test_cancel_after_accept_and_writes_closed( cq_expect_completion(cqv, tag(2), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index fc2751af8e1..85fbe9de7bb 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -131,6 +131,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config, grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c index 33005db9e45..d99062c6082 100644 --- a/test/core/end2end/tests/cancel_before_invoke.c +++ b/test/core/end2end/tests/cancel_before_invoke.c @@ -131,6 +131,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config, grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; diff --git a/test/core/end2end/tests/cancel_with_status.c b/test/core/end2end/tests/cancel_with_status.c index c3ee4a6a0ee..83629a9a028 100644 --- a/test/core/end2end/tests/cancel_with_status.c +++ b/test/core/end2end/tests/cancel_with_status.c @@ -122,6 +122,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) { grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; diff --git a/test/core/end2end/tests/compressed_payload.c b/test/core/end2end/tests/compressed_payload.c index 589bc314f8e..ec5c012238e 100644 --- a/test/core/end2end/tests/compressed_payload.c +++ b/test/core/end2end/tests/compressed_payload.c @@ -38,13 +38,15 @@ #include <grpc/byte_buffer.h> #include <grpc/byte_buffer_reader.h> +#include <grpc/compression.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/compress_filter.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call_test_only.h" #include "test/core/end2end/cq_verifier.h" @@ -102,12 +104,12 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_destroy(f->cq); } -static void request_with_payload_template( +static void request_for_disabled_algorithm( grpc_end2end_test_config config, const char *test_name, uint32_t send_flags_bitmask, - grpc_compression_algorithm requested_compression_algorithm, - grpc_compression_algorithm expected_compression_algorithm, - grpc_metadata *client_metadata) { + grpc_compression_algorithm algorithm_to_disable, + grpc_compression_algorithm requested_client_compression_algorithm, + grpc_status_code expected_error, grpc_metadata *client_metadata) { grpc_call *c; grpc_call *s; gpr_slice request_payload_slice; @@ -137,9 +139,11 @@ static void request_with_payload_template( request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); client_args = grpc_channel_args_set_compression_algorithm( - NULL, requested_compression_algorithm); - server_args = grpc_channel_args_set_compression_algorithm( - NULL, requested_compression_algorithm); + NULL, requested_client_compression_algorithm); + server_args = + grpc_channel_args_set_compression_algorithm(NULL, GRPC_COMPRESS_NONE); + server_args = grpc_channel_args_compression_algorithm_set_state( + &server_args, algorithm_to_disable, false); f = begin_test(config, test_name, client_args, server_args); cqv = cq_verifier_create(f.cq); @@ -153,6 +157,7 @@ static void request_with_payload_template( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; if (client_metadata != NULL) { @@ -193,11 +198,174 @@ static void request_with_payload_template( grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, f.cq, f.cq, tag(101)); GPR_ASSERT(GRPC_CALL_OK == error); - cq_expect_completion(cqv, tag(101), 1); + cq_expect_completion(cqv, tag(101), true); + cq_verify(cqv); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(102), false); + + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(103), true); + cq_expect_completion(cqv, tag(1), true); cq_verify(cqv); - GPR_ASSERT( - GPR_BITCOUNT(grpc_call_test_only_get_encodings_accepted_by_peer(s)) == 3); + /* call was cancelled (closed) ... */ + GPR_ASSERT(was_cancelled != 0); + /* with a certain error */ + GPR_ASSERT(status == expected_error); + + char *algo_name = NULL; + GPR_ASSERT(grpc_compression_algorithm_name(algorithm_to_disable, &algo_name)); + char *expected_details = NULL; + gpr_asprintf(&expected_details, "Compression algorithm '%s' is disabled.", + algo_name); + /* and we expect a specific reason for it */ + GPR_ASSERT(0 == strcmp(details, expected_details)); + gpr_free(expected_details); + GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); + GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_destroy(c); + grpc_call_destroy(s); + + cq_verifier_destroy(cqv); + + gpr_slice_unref(request_payload_slice); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(request_payload_recv); + + grpc_channel_args_destroy(client_args); + grpc_channel_args_destroy(server_args); + + end_test(&f); + config.tear_down_data(&f); +} + +static void request_with_payload_template( + grpc_end2end_test_config config, const char *test_name, + uint32_t client_send_flags_bitmask, + grpc_compression_algorithm default_client_channel_compression_algorithm, + grpc_compression_algorithm default_server_channel_compression_algorithm, + grpc_compression_algorithm expected_algorithm_from_client, + grpc_compression_algorithm expected_algorithm_from_server, + grpc_metadata *client_init_metadata, bool set_server_level, + grpc_compression_level server_compression_level) { + grpc_call *c; + grpc_call *s; + gpr_slice request_payload_slice; + grpc_byte_buffer *request_payload; + gpr_timespec deadline = five_seconds_time(); + grpc_channel_args *client_args; + grpc_channel_args *server_args; + grpc_end2end_test_fixture f; + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *request_payload_recv = NULL; + grpc_byte_buffer *response_payload; + grpc_byte_buffer *response_payload_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + int was_cancelled = 2; + cq_verifier *cqv; + char request_str[1024]; + char response_str[1024]; + + memset(request_str, 'x', 1023); + request_str[1023] = '\0'; + + memset(response_str, 'y', 1023); + response_str[1023] = '\0'; + + request_payload_slice = gpr_slice_from_copied_string(request_str); + gpr_slice response_payload_slice = gpr_slice_from_copied_string(response_str); + + client_args = grpc_channel_args_set_compression_algorithm( + NULL, default_client_channel_compression_algorithm); + server_args = grpc_channel_args_set_compression_algorithm( + NULL, default_server_channel_compression_algorithm); + + f = begin_test(config, test_name, client_args, server_args); + cqv = cq_verifier_create(f.cq); + + c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + "/foo", "foo.test.google.fr", deadline, NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + if (client_init_metadata != NULL) { + op->data.send_initial_metadata.count = 1; + op->data.send_initial_metadata.metadata = client_init_metadata; + } else { + op->data.send_initial_metadata.count = 0; + } + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_expect_completion(cqv, tag(100), true); + cq_verify(cqv); + + GPR_ASSERT(GPR_BITCOUNT(grpc_call_test_only_get_encodings_accepted_by_peer( + s)) == GRPC_COMPRESS_ALGORITHMS_COUNT); GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s), GRPC_COMPRESS_NONE) != 0); GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s), @@ -205,29 +373,107 @@ static void request_with_payload_template( GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s), GRPC_COMPRESS_GZIP) != 0); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; + if (set_server_level) { + op->data.send_initial_metadata.maybe_compression_level.is_set = true; + op->data.send_initial_metadata.maybe_compression_level.level = + server_compression_level; + } op->flags = 0; op->reserved = NULL; op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &request_payload_recv; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; op->flags = 0; op->reserved = NULL; op++; - error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - cq_expect_completion(cqv, tag(102), 1); - cq_verify(cqv); + for (int i = 0; i < 2; i++) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = client_send_flags_bitmask; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_expect_completion(cqv, tag(102), 1); + cq_verify(cqv); + + GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW); + GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, request_str)); + GPR_ASSERT(request_payload_recv->data.raw.compression == + expected_algorithm_from_client); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_expect_completion(cqv, tag(103), 1); + cq_expect_completion(cqv, tag(2), 1); + cq_verify(cqv); + + GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW); + GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, response_str)); + if (server_compression_level > GRPC_COMPRESS_LEVEL_NONE) { + const grpc_compression_algorithm algo_for_server_level = + grpc_call_compression_for_level(s, server_compression_level); + GPR_ASSERT(response_payload_recv->data.raw.compression == + algo_for_server_level); + } else { + GPR_ASSERT(response_payload_recv->data.raw.compression == + expected_algorithm_from_server); + } + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + } + + gpr_slice_unref(request_payload_slice); + gpr_slice_unref(response_payload_slice); + memset(ops, 0, sizeof(ops)); op = ops; - op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - op->data.recv_close_on_server.cancelled = &was_cancelled; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; op->flags = 0; op->reserved = NULL; op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; @@ -235,11 +481,13 @@ static void request_with_payload_template( op->flags = 0; op->reserved = NULL; op++; - error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - cq_expect_completion(cqv, tag(103), 1); cq_expect_completion(cqv, tag(1), 1); + cq_expect_completion(cqv, tag(3), 1); + cq_expect_completion(cqv, tag(101), 1); + cq_expect_completion(cqv, tag(104), 1); cq_verify(cqv); GPR_ASSERT(status == GRPC_STATUS_OK); @@ -248,12 +496,6 @@ static void request_with_payload_template( GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); GPR_ASSERT(was_cancelled == 0); - GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW); - GPR_ASSERT(request_payload_recv->data.raw.compression == - expected_compression_algorithm); - - GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, str)); - gpr_free(details); grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv); @@ -265,10 +507,6 @@ static void request_with_payload_template( cq_verifier_destroy(cqv); - gpr_slice_unref(request_payload_slice); - grpc_byte_buffer_destroy(request_payload); - grpc_byte_buffer_destroy(request_payload_recv); - grpc_channel_args_destroy(client_args); grpc_channel_args_destroy(server_args); @@ -280,61 +518,90 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( config, "test_invoke_request_with_exceptionally_uncompressed_payload", - GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_NONE, NULL); + GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, + GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, NULL, false, + /* ignored */ GRPC_COMPRESS_LEVEL_NONE); } static void test_invoke_request_with_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( config, "test_invoke_request_with_uncompressed_payload", 0, - GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, NULL); + GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, + GRPC_COMPRESS_NONE, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE); } static void test_invoke_request_with_compressed_payload( grpc_end2end_test_config config) { request_with_payload_template( config, "test_invoke_request_with_compressed_payload", 0, - GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, NULL); + GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, + GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE); +} + +static void test_invoke_request_with_server_level( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_server_level", 0, GRPC_COMPRESS_NONE, + GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE /* ignored */, + NULL, true, GRPC_COMPRESS_LEVEL_HIGH); } static void test_invoke_request_with_compressed_payload_md_override( grpc_end2end_test_config config) { grpc_metadata gzip_compression_override; - grpc_metadata none_compression_override; + grpc_metadata identity_compression_override; - gzip_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY; + gzip_compression_override.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY; gzip_compression_override.value = "gzip"; - gzip_compression_override.value_length = 4; + gzip_compression_override.value_length = + strlen(gzip_compression_override.value); memset(&gzip_compression_override.internal_data, 0, sizeof(gzip_compression_override.internal_data)); - none_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY; - none_compression_override.value = "identity"; - none_compression_override.value_length = 4; - memset(&none_compression_override.internal_data, 0, - sizeof(none_compression_override.internal_data)); + identity_compression_override.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY; + identity_compression_override.value = "identity"; + identity_compression_override.value_length = + strlen(identity_compression_override.value); + memset(&identity_compression_override.internal_data, 0, + sizeof(identity_compression_override.internal_data)); /* Channel default NONE (aka IDENTITY), call override to GZIP */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_1", 0, - GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, &gzip_compression_override); + GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, + GRPC_COMPRESS_NONE, &gzip_compression_override, false, + /*ignored*/ GRPC_COMPRESS_LEVEL_NONE); /* Channel default DEFLATE, call override to GZIP */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_2", 0, - GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_GZIP, &gzip_compression_override); + GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, + GRPC_COMPRESS_NONE, &gzip_compression_override, false, + /*ignored*/ GRPC_COMPRESS_LEVEL_NONE); /* Channel default DEFLATE, call override to NONE (aka IDENTITY) */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_3", 0, - GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, &none_compression_override); + GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, + GRPC_COMPRESS_NONE, &identity_compression_override, false, + /*ignored*/ GRPC_COMPRESS_LEVEL_NONE); +} + +static void test_invoke_request_with_disabled_algorithm( + grpc_end2end_test_config config) { + request_for_disabled_algorithm( + config, "test_invoke_request_with_disabled_algorithm", 0, + GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_STATUS_UNIMPLEMENTED, NULL); } void compressed_payload(grpc_end2end_test_config config) { test_invoke_request_with_exceptionally_uncompressed_payload(config); test_invoke_request_with_uncompressed_payload(config); test_invoke_request_with_compressed_payload(config); + test_invoke_request_with_server_level(config); test_invoke_request_with_compressed_payload_md_override(config); + test_invoke_request_with_disabled_algorithm(config); } void compressed_payload_pre_init(void) {} diff --git a/test/core/end2end/tests/default_host.c b/test/core/end2end/tests/default_host.c index 44384a783ec..728ee597b50 100644 --- a/test/core/end2end/tests/default_host.c +++ b/test/core/end2end/tests/default_host.c @@ -131,6 +131,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -173,6 +174,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { gpr_log(GPR_DEBUG, "client_peer=%s", peer); gpr_free(peer); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index 03d1ded04cf..536fbd0d8a8 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -108,6 +108,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -145,6 +146,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, - and still complete the request */ grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 29244d94287..e6edb213cc9 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -134,6 +134,7 @@ static void test_request(grpc_end2end_test_config config) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index 26198f3bdfc..f527b8617d9 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -122,6 +122,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -160,6 +161,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead)); cq_verify_empty(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c index 374606dcb73..50e3c9cb898 100644 --- a/test/core/end2end/tests/high_initial_seqno.c +++ b/test/core/end2end/tests/high_initial_seqno.c @@ -128,6 +128,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -161,6 +162,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/hpack_size.c b/test/core/end2end/tests/hpack_size.c index 07d5d387b4c..ee889b77a1c 100644 --- a/test/core/end2end/tests/hpack_size.c +++ b/test/core/end2end/tests/hpack_size.c @@ -323,6 +323,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, size_t index) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = GPR_ARRAY_SIZE(extra_metadata); @@ -357,6 +358,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, size_t index) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/idempotent_request.c b/test/core/end2end/tests/idempotent_request.c index e53f3b2aaa8..dfedcfebee8 100644 --- a/test/core/end2end/tests/idempotent_request.c +++ b/test/core/end2end/tests/idempotent_request.c @@ -132,6 +132,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -174,6 +175,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { gpr_log(GPR_DEBUG, "client_peer=%s", peer); gpr_free(peer); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index 6410305451a..9c9ca951293 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -138,6 +138,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -181,6 +182,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -198,6 +200,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/end2end/tests/large_metadata.c b/test/core/end2end/tests/large_metadata.c index ae1f68a2b45..1f278960196 100644 --- a/test/core/end2end/tests/large_metadata.c +++ b/test/core/end2end/tests/large_metadata.c @@ -142,6 +142,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); // Client: send request. op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -183,6 +184,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); // Server: send initial metadata and receive request. op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -201,6 +203,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); // Server: receive close and send status. This should trigger // completion of request on client. op = ops; diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index 1bb53073cb0..41de74ff874 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -124,6 +124,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -157,6 +158,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -270,6 +272,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { f.server, &s1, &call_details, &request_metadata_recv, f.cq, f.cq, tag(101))); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -283,6 +286,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { error = grpc_call_start_batch(c1, ops, (size_t)(op - ops), tag(301), NULL); GPR_ASSERT(GRPC_CALL_OK == error); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1; @@ -300,6 +304,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { error = grpc_call_start_batch(c1, ops, (size_t)(op - ops), tag(302), NULL); GPR_ASSERT(GRPC_CALL_OK == error); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -313,6 +318,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { error = grpc_call_start_batch(c2, ops, (size_t)(op - ops), tag(401), NULL); GPR_ASSERT(GRPC_CALL_OK == error); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2; @@ -354,6 +360,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { } GPR_ASSERT(live_call == 300 || live_call == 400); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -388,6 +395,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(201), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index 4f572789d95..08d326ab4d6 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -143,6 +143,7 @@ static void test_max_message_length(grpc_end2end_test_config config) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -181,6 +182,7 @@ static void test_max_message_length(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/end2end/tests/negative_deadline.c b/test/core/end2end/tests/negative_deadline.c index 03e57a90f6f..fd56c8b4ff1 100644 --- a/test/core/end2end/tests/negative_deadline.c +++ b/test/core/end2end/tests/negative_deadline.c @@ -122,6 +122,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) { grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; diff --git a/test/core/end2end/tests/payload.c b/test/core/end2end/tests/payload.c index bdfb1354068..443d85eecc4 100644 --- a/test/core/end2end/tests/payload.c +++ b/test/core/end2end/tests/payload.c @@ -131,6 +131,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -174,6 +175,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -191,6 +193,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index 15e1c6e338a..1d2f7943c1d 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -135,6 +135,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -164,6 +165,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, cq_expect_completion(cqv, tag(100), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -182,6 +184,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; @@ -196,6 +199,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); GPR_ASSERT(GRPC_CALL_OK == error); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &request_payload_recv; @@ -207,6 +211,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = response_payload; @@ -228,6 +233,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; op->flags = 0; @@ -236,6 +242,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); GPR_ASSERT(GRPC_CALL_OK == error); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c index 3c4edbae7dc..ece6250ea13 100644 --- a/test/core/end2end/tests/registered_call.c +++ b/test/core/end2end/tests/registered_call.c @@ -125,6 +125,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -158,6 +159,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/request_with_flags.c b/test/core/end2end/tests/request_with_flags.c index f5a8d4d60cf..b5d398bba9f 100644 --- a/test/core/end2end/tests/request_with_flags.c +++ b/test/core/end2end/tests/request_with_flags.c @@ -131,6 +131,7 @@ static void test_invoke_request_with_flags( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 77064040909..d94267e09cf 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -130,6 +130,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -167,6 +168,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -184,6 +186,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/end2end/tests/server_finishes_request.c b/test/core/end2end/tests/server_finishes_request.c index a4f5319e5b4..a723c6fd2ca 100644 --- a/test/core/end2end/tests/server_finishes_request.c +++ b/test/core/end2end/tests/server_finishes_request.c @@ -126,6 +126,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -155,6 +156,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/shutdown_finishes_calls.c b/test/core/end2end/tests/shutdown_finishes_calls.c index 80287cd507a..abb6b26a875 100644 --- a/test/core/end2end/tests/shutdown_finishes_calls.c +++ b/test/core/end2end/tests/shutdown_finishes_calls.c @@ -115,6 +115,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -149,6 +150,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 400b3a00274..e1fcc035bbd 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -117,6 +117,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -152,6 +153,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/simple_metadata.c b/test/core/end2end/tests/simple_metadata.c index 707b3c9512c..c9b1a03da52 100644 --- a/test/core/end2end/tests/simple_metadata.c +++ b/test/core/end2end/tests/simple_metadata.c @@ -141,6 +141,7 @@ static void test_request_response_with_metadata_and_payload( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; @@ -185,6 +186,7 @@ static void test_request_response_with_metadata_and_payload( cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; @@ -203,6 +205,7 @@ static void test_request_response_with_metadata_and_payload( cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 42108425e6f..a8014e6894c 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -132,6 +132,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -174,6 +175,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { gpr_log(GPR_DEBUG, "client_peer=%s", peer); gpr_free(peer); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/end2end/tests/trailing_metadata.c b/test/core/end2end/tests/trailing_metadata.c index 4dd8c12cba9..41e0f009113 100644 --- a/test/core/end2end/tests/trailing_metadata.c +++ b/test/core/end2end/tests/trailing_metadata.c @@ -144,6 +144,7 @@ static void test_request_response_with_metadata_and_payload( grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; @@ -188,6 +189,7 @@ static void test_request_response_with_metadata_and_payload( cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 2; @@ -206,6 +208,7 @@ static void test_request_response_with_metadata_and_payload( cq_expect_completion(cqv, tag(102), 1); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 81562277ec0..123f2b5bbed 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -65,6 +65,7 @@ static void init_ping_pong_request(void) { grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index 3286db5b1b8..edd50568d6c 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -115,6 +115,7 @@ int main(int argc, char **argv) { GPR_ASSERT(call); cqv = cq_verifier_create(cq); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -133,6 +134,7 @@ int main(int argc, char **argv) { cq_expect_completion(cqv, tag(1), 0); cq_verify(cqv); + memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 64ab164d570..10c8cbd5492 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -948,7 +948,7 @@ src/core/lib/channel/compress_filter.c \ src/core/lib/channel/connected_channel.c \ src/core/lib/channel/http_client_filter.c \ src/core/lib/channel/http_server_filter.c \ -src/core/lib/compression/compression_algorithm.c \ +src/core/lib/compression/compression.c \ src/core/lib/compression/message_compress.c \ src/core/lib/debug/trace.c \ src/core/lib/http/format_request.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index cff4918fc3f..fd522ee1739 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -5733,7 +5733,7 @@ "src/core/lib/channel/http_server_filter.c", "src/core/lib/channel/http_server_filter.h", "src/core/lib/compression/algorithm_metadata.h", - "src/core/lib/compression/compression_algorithm.c", + "src/core/lib/compression/compression.c", "src/core/lib/compression/message_compress.c", "src/core/lib/compression/message_compress.h", "src/core/lib/debug/trace.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 4327ab93173..1824b9272d7 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -467,7 +467,7 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.c"> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression_algorithm.c"> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\message_compress.c"> </ClCompile> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 860575cc09d..e77b79e4c26 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -25,7 +25,7 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression_algorithm.c"> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression.c"> <Filter>src\core\lib\compression</Filter> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\message_compress.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 3695ad44a0c..bc5b153534d 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -435,7 +435,7 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.c"> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression_algorithm.c"> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\message_compress.c"> </ClCompile> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index a22e199f593..792a2dbdaab 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -28,7 +28,7 @@ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.c"> <Filter>src\core\lib\channel</Filter> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression_algorithm.c"> + <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\compression.c"> <Filter>src\core\lib\compression</Filter> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\lib\compression\message_compress.c">