diff --git a/BUILD b/BUILD index 81de8203a03..9dcca83cace 100644 --- a/BUILD +++ b/BUILD @@ -1193,11 +1193,13 @@ grpc_cc_library( "src/core/ext/filters/http/client/http_client_filter.cc", "src/core/ext/filters/http/http_filters_plugin.cc", "src/core/ext/filters/http/message_compress/message_compress_filter.cc", + "src/core/ext/filters/http/message_compress/message_decompress_filter.cc", "src/core/ext/filters/http/server/http_server_filter.cc", ], hdrs = [ "src/core/ext/filters/http/client/http_client_filter.h", "src/core/ext/filters/http/message_compress/message_compress_filter.h", + "src/core/ext/filters/http/message_compress/message_decompress_filter.h", "src/core/ext/filters/http/server/http_server_filter.h", ], language = "c++", diff --git a/BUILD.gn b/BUILD.gn index 129ad2b2639..f1536af4463 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -318,6 +318,8 @@ config("grpc_config") { "src/core/ext/filters/http/http_filters_plugin.cc", "src/core/ext/filters/http/message_compress/message_compress_filter.cc", "src/core/ext/filters/http/message_compress/message_compress_filter.h", + "src/core/ext/filters/http/message_compress/message_decompress_filter.cc", + "src/core/ext/filters/http/message_compress/message_decompress_filter.h", "src/core/ext/filters/http/server/http_server_filter.cc", "src/core/ext/filters/http/server/http_server_filter.h", "src/core/ext/filters/max_age/max_age_filter.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 4fc05f7c174..915c126b6e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1369,6 +1369,7 @@ add_library(grpc src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc src/core/ext/filters/http/message_compress/message_compress_filter.cc + src/core/ext/filters/http/message_compress/message_decompress_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/message_size/message_size_filter.cc @@ -2028,6 +2029,7 @@ add_library(grpc_unsecure src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc src/core/ext/filters/http/message_compress/message_compress_filter.cc + src/core/ext/filters/http/message_compress/message_decompress_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/message_size/message_size_filter.cc diff --git a/Makefile b/Makefile index 92ac929d3bc..53e038705fa 100644 --- a/Makefile +++ b/Makefile @@ -3694,6 +3694,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ + src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ @@ -4327,6 +4328,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ + src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 0f98d3a24ae..b688b8b5102 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -423,6 +423,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - src/core/ext/filters/http/message_compress/message_compress_filter.h + - src/core/ext/filters/http/message_compress/message_decompress_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/max_age/max_age_filter.h - src/core/ext/filters/message_size/message_size_filter.h @@ -795,6 +796,7 @@ libs: - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/message_compress_filter.cc + - src/core/ext/filters/http/message_compress/message_decompress_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/max_age/max_age_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc @@ -1325,6 +1327,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - src/core/ext/filters/http/message_compress/message_compress_filter.h + - src/core/ext/filters/http/message_compress/message_decompress_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/max_age/max_age_filter.h - src/core/ext/filters/message_size/message_size_filter.h @@ -1632,6 +1635,7 @@ libs: - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/message_compress_filter.cc + - src/core/ext/filters/http/message_compress/message_decompress_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/max_age/max_age_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc diff --git a/config.m4 b/config.m4 index 6b450315f4f..08e8d159803 100644 --- a/config.m4 +++ b/config.m4 @@ -104,6 +104,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ + src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ diff --git a/config.w32 b/config.w32 index 59cdbdb1ce4..557bfc033fb 100644 --- a/config.w32 +++ b/config.w32 @@ -73,6 +73,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\http\\client_authority_filter.cc " + "src\\core\\ext\\filters\\http\\http_filters_plugin.cc " + "src\\core\\ext\\filters\\http\\message_compress\\message_compress_filter.cc " + + "src\\core\\ext\\filters\\http\\message_compress\\message_decompress_filter.cc " + "src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " + "src\\core\\ext\\filters\\max_age\\max_age_filter.cc " + "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 77980999b59..6393cfb5217 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -274,6 +274,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', @@ -725,6 +726,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 749e77b3fa7..ed08bfd08c5 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -301,6 +301,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.cc', @@ -1078,6 +1080,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', diff --git a/grpc.gemspec b/grpc.gemspec index 29e5d3d436b..8de1b7ec83a 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -223,6 +223,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc ) s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.cc ) s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.h ) + s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.cc ) + s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.h ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.h ) s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc ) diff --git a/grpc.gyp b/grpc.gyp index a62c034d65a..c43d6459f8e 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -497,6 +497,7 @@ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', @@ -992,6 +993,7 @@ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index c7dd23caa49..a57d07c5431 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -174,6 +174,11 @@ typedef struct { /** Enable/disable support for per-message compression. Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */ #define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression" +/** Experimental Arg. Enable/disable support for per-message decompression. + Defaults to 1. If disabled, decompression will not be performed and the + application will see the compressed message in the byte buffer. */ +#define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION \ + "grpc.per_message_decompression" /** Enable/disable support for deadline checking. Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0 */ #define GRPC_ARG_ENABLE_DEADLINE_CHECKS "grpc.enable_deadline_checking" diff --git a/package.xml b/package.xml index 06d1669ec48..3481a55289b 100644 --- a/package.xml +++ b/package.xml @@ -203,6 +203,8 @@ + + diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index f03fa0141df..4094ffa8a03 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -22,6 +22,7 @@ #include "src/core/ext/filters/http/client/http_client_filter.h" #include "src/core/ext/filters/http/message_compress/message_compress_filter.h" +#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/surface/call.h" @@ -36,12 +37,16 @@ typedef struct { static optional_filter compress_filter = { &grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION}; +static optional_filter decompress_filter = { + &grpc_message_decompress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION}; + static bool is_building_http_like_transport( grpc_channel_stack_builder* builder) { grpc_transport* t = grpc_channel_stack_builder_get_transport(builder); return t != nullptr && strstr(t->vtable->name, "http"); } +template static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder, void* arg) { if (!is_building_http_like_transport(builder)) return true; @@ -50,7 +55,8 @@ static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder, grpc_channel_stack_builder_get_channel_arguments(builder); bool enable = grpc_channel_arg_get_bool( grpc_channel_args_find(channel_args, filtarg->control_channel_arg), - !grpc_channel_args_want_minimal_stack(channel_args)); + enable_in_minimal_stack || + !grpc_channel_args_want_minimal_stack(channel_args)); return enable ? grpc_channel_stack_builder_prepend_filter( builder, filtarg->filter, nullptr, nullptr) : true; @@ -66,15 +72,24 @@ static bool maybe_add_required_filter(grpc_channel_stack_builder* builder, } void grpc_http_filters_init(void) { - grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, - GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &compress_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, - GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &compress_filter); - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, - GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_optional_filter, &compress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &compress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &compress_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &compress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_required_filter, (void*)&grpc_http_client_filter); diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc new file mode 100644 index 00000000000..d12f4013bb2 --- /dev/null +++ b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc @@ -0,0 +1,358 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/compression/algorithm_metadata.h" +#include "src/core/lib/compression/compression_args.h" +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" + +namespace { + +class ChannelData {}; + +class CallData { + public: + explicit CallData(const grpc_call_element_args& args) + : call_combiner_(args.call_combiner) { + // Initialize state for recv_initial_metadata_ready callback + GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_, + OnRecvInitialMetadataReady, this, + grpc_schedule_on_exec_ctx); + // Initialize state for recv_message_ready callback + grpc_slice_buffer_init(&recv_slices_); + GRPC_CLOSURE_INIT(&on_recv_message_next_done_, OnRecvMessageNextDone, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_recv_message_ready_, OnRecvMessageReady, this, + grpc_schedule_on_exec_ctx); + // Initialize state for recv_trailing_metadata_ready callback + GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_, + OnRecvTrailingMetadataReady, this, + grpc_schedule_on_exec_ctx); + } + + ~CallData() { grpc_slice_buffer_destroy_internal(&recv_slices_); } + + void DecompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); + + private: + static void OnRecvInitialMetadataReady(void* arg, grpc_error* error); + + // Methods for processing a receive message event + void MaybeResumeOnRecvMessageReady(); + static void OnRecvMessageReady(void* arg, grpc_error* error); + static void OnRecvMessageNextDone(void* arg, grpc_error* error); + grpc_error* PullSliceFromRecvMessage(); + void ContinueReadingRecvMessage(); + void FinishRecvMessage(); + void ContinueRecvMessageReadyCallback(grpc_error* error); + + // Methods for processing a recv_trailing_metadata event + void MaybeResumeOnRecvTrailingMetadataReady(); + static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error); + + grpc_core::CallCombiner* call_combiner_; + // Overall error for the call + grpc_error* error_ = GRPC_ERROR_NONE; + // Fields for handling recv_initial_metadata_ready callback + grpc_closure on_recv_initial_metadata_ready_; + grpc_closure* original_recv_initial_metadata_ready_ = nullptr; + grpc_metadata_batch* recv_initial_metadata_ = nullptr; + // Fields for handling recv_message_ready callback + bool seen_recv_message_ready_ = false; + grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE; + grpc_closure on_recv_message_ready_; + grpc_closure* original_recv_message_ready_ = nullptr; + grpc_closure on_recv_message_next_done_; + grpc_core::OrphanablePtr* recv_message_ = nullptr; + // recv_slices_ holds the slices read from the original recv_message stream. + // It is initialized during construction and reset when a new stream is + // created using it. + grpc_slice_buffer recv_slices_; + std::aligned_storage::type + recv_replacement_stream_; + // Fields for handling recv_trailing_metadata_ready callback + bool seen_recv_trailing_metadata_ready_ = false; + grpc_closure on_recv_trailing_metadata_ready_; + grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; + grpc_error* on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE; +}; + +grpc_message_compression_algorithm DecodeMessageCompressionAlgorithm( + grpc_mdelem md) { + grpc_message_compression_algorithm algorithm = + grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md)); + if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) { + char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log(GPR_ERROR, + "Invalid incoming message compression algorithm: '%s'. " + "Interpreting incoming data as uncompressed.", + md_c_str); + gpr_free(md_c_str); + return GRPC_MESSAGE_COMPRESS_NONE; + } + return algorithm; +} + +void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (error == GRPC_ERROR_NONE) { + grpc_linked_mdelem* grpc_encoding = + calld->recv_initial_metadata_->idx.named.grpc_encoding; + if (grpc_encoding != nullptr) { + calld->algorithm_ = DecodeMessageCompressionAlgorithm(grpc_encoding->md); + } + } + calld->MaybeResumeOnRecvMessageReady(); + calld->MaybeResumeOnRecvTrailingMetadataReady(); + grpc_closure* closure = calld->original_recv_initial_metadata_ready_; + calld->original_recv_initial_metadata_ready_ = nullptr; + grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error)); +} + +void CallData::MaybeResumeOnRecvMessageReady() { + if (seen_recv_message_ready_) { + seen_recv_message_ready_ = false; + GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_message_ready_, + GRPC_ERROR_NONE, + "continue recv_message_ready callback"); + } +} + +void CallData::OnRecvMessageReady(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (error == GRPC_ERROR_NONE) { + if (calld->original_recv_initial_metadata_ready_ != nullptr) { + calld->seen_recv_message_ready_ = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner_, + "Deferring OnRecvMessageReady until after " + "OnRecvInitialMetadataReady"); + return; + } + if (calld->algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) { + // recv_message can be NULL if trailing metadata is received instead of + // message, or it's possible that the message was not compressed. + if (*calld->recv_message_ == nullptr || + (*calld->recv_message_)->length() == 0 || + ((*calld->recv_message_)->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == + 0) { + return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE); + } + grpc_slice_buffer_destroy_internal(&calld->recv_slices_); + grpc_slice_buffer_init(&calld->recv_slices_); + return calld->ContinueReadingRecvMessage(); + } + } + calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error)); +} + +void CallData::ContinueReadingRecvMessage() { + while ((*recv_message_) + ->Next((*recv_message_)->length() - recv_slices_.length, + &on_recv_message_next_done_)) { + grpc_error* error = PullSliceFromRecvMessage(); + if (error != GRPC_ERROR_NONE) { + return ContinueRecvMessageReadyCallback(error); + } + // We have read the entire message. + if (recv_slices_.length == (*recv_message_)->length()) { + return FinishRecvMessage(); + } + } +} + +grpc_error* CallData::PullSliceFromRecvMessage() { + grpc_slice incoming_slice; + grpc_error* error = (*recv_message_)->Pull(&incoming_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&recv_slices_, incoming_slice); + } + return error; +} + +void CallData::OnRecvMessageNextDone(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (error != GRPC_ERROR_NONE) { + return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error)); + } + error = calld->PullSliceFromRecvMessage(); + if (error != GRPC_ERROR_NONE) { + return calld->ContinueRecvMessageReadyCallback(error); + } + if (calld->recv_slices_.length == (*calld->recv_message_)->length()) { + calld->FinishRecvMessage(); + } else { + calld->ContinueReadingRecvMessage(); + } +} + +void CallData::FinishRecvMessage() { + grpc_slice_buffer decompressed_slices; + grpc_slice_buffer_init(&decompressed_slices); + if (grpc_msg_decompress(algorithm_, &recv_slices_, &decompressed_slices) == + 0) { + char* msg; + gpr_asprintf( + &msg, + "Unexpected error decompressing data for algorithm with enum value %d", + algorithm_); + GPR_DEBUG_ASSERT(error_ == GRPC_ERROR_NONE); + error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + grpc_slice_buffer_destroy_internal(&decompressed_slices); + } else { + uint32_t recv_flags = + ((*recv_message_)->flags() & (~GRPC_WRITE_INTERNAL_COMPRESS)) | + GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; + // Swap out the original receive byte stream with our new one and send the + // batch down. + // Initializing recv_replacement_stream_ with decompressed_slices removes + // all the slices from decompressed_slices leaving it empty. + new (&recv_replacement_stream_) + grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags); + recv_message_->reset(reinterpret_cast( + &recv_replacement_stream_)); + recv_message_ = nullptr; + } + ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_)); +} + +void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) { + MaybeResumeOnRecvTrailingMetadataReady(); + // The surface will clean up the receiving stream if there is an error. + grpc_closure* closure = original_recv_message_ready_; + original_recv_message_ready_ = nullptr; + grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); +} + +void CallData::MaybeResumeOnRecvTrailingMetadataReady() { + if (seen_recv_trailing_metadata_ready_) { + seen_recv_trailing_metadata_ready_ = false; + grpc_error* error = on_recv_trailing_metadata_ready_error_; + on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE; + GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_trailing_metadata_ready_, + error, "Continuing OnRecvTrailingMetadataReady"); + } +} + +void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (calld->original_recv_initial_metadata_ready_ != nullptr || + calld->original_recv_message_ready_ != nullptr) { + calld->seen_recv_trailing_metadata_ready_ = true; + calld->on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_REF(error); + GRPC_CALL_COMBINER_STOP( + calld->call_combiner_, + "Deferring OnRecvTrailingMetadataReady until after " + "OnRecvInitialMetadataReady and OnRecvMessageReady"); + return; + } + error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error_); + calld->error_ = GRPC_ERROR_NONE; + grpc_closure* closure = calld->original_recv_trailing_metadata_ready_; + calld->original_recv_trailing_metadata_ready_ = nullptr; + grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); +} + +void CallData::DecompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + // Handle recv_initial_metadata. + if (batch->recv_initial_metadata) { + recv_initial_metadata_ = + batch->payload->recv_initial_metadata.recv_initial_metadata; + original_recv_initial_metadata_ready_ = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &on_recv_initial_metadata_ready_; + } + // Handle recv_message + if (batch->recv_message) { + recv_message_ = batch->payload->recv_message.recv_message; + original_recv_message_ready_ = + batch->payload->recv_message.recv_message_ready; + batch->payload->recv_message.recv_message_ready = &on_recv_message_ready_; + } + // Handle recv_trailing_metadata + if (batch->recv_trailing_metadata) { + original_recv_trailing_metadata_ready_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &on_recv_trailing_metadata_ready_; + } + // Pass control down the stack. + grpc_call_next_op(elem, batch); +} + +void DecompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + GPR_TIMER_SCOPE("decompress_start_transport_stream_op_batch", 0); + CallData* calld = static_cast(elem->call_data); + calld->DecompressStartTransportStreamOpBatch(elem, batch); +} + +static grpc_error* DecompressInitCallElem(grpc_call_element* elem, + const grpc_call_element_args* args) { + new (elem->call_data) CallData(*args); + return GRPC_ERROR_NONE; +} + +static void DecompressDestroyCallElem( + grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { + CallData* calld = static_cast(elem->call_data); + calld->~CallData(); +} + +static grpc_error* DecompressInitChannelElem( + grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) { + return GRPC_ERROR_NONE; +} + +void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) {} + +} // namespace + +const grpc_channel_filter grpc_message_decompress_filter = { + DecompressStartTransportStreamOpBatch, + grpc_channel_next_op, + sizeof(CallData), + DecompressInitCallElem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + DecompressDestroyCallElem, + 0, // sizeof(ChannelData) + DecompressInitChannelElem, + DecompressDestroyChannelElem, + grpc_channel_next_get_info, + "message_decompress"}; diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.h b/src/core/ext/filters/http/message_compress/message_decompress_filter.h new file mode 100644 index 00000000000..7d567bf08a2 --- /dev/null +++ b/src/core/ext/filters/http/message_compress/message_decompress_filter.h @@ -0,0 +1,29 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H +#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H + +#include + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_message_decompress_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \ + */ diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc index ed8ecc49590..3689c79455e 100644 --- a/src/core/lib/surface/byte_buffer_reader.cc +++ b/src/core/lib/surface/byte_buffer_reader.cc @@ -22,73 +22,28 @@ #include #include -#include #include #include #include #include -#include "src/core/lib/compression/message_compress.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" -static int is_compressed(grpc_byte_buffer* buffer) { - switch (buffer->type) { - case GRPC_BB_RAW: - if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) { - return 0 /* GPR_FALSE */; - } - break; - } - return 1 /* GPR_TRUE */; -} - int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) { - grpc_core::ExecCtx exec_ctx; - grpc_slice_buffer decompressed_slices_buffer; reader->buffer_in = buffer; switch (reader->buffer_in->type) { case GRPC_BB_RAW: - grpc_slice_buffer_init(&decompressed_slices_buffer); - if (is_compressed(reader->buffer_in)) { - if (grpc_msg_decompress( - - grpc_compression_algorithm_to_message_compression_algorithm( - reader->buffer_in->data.raw.compression), - &reader->buffer_in->data.raw.slice_buffer, - &decompressed_slices_buffer) == 0) { - gpr_log(GPR_ERROR, - "Unexpected error decompressing data for algorithm with enum " - "value '%d'.", - reader->buffer_in->data.raw.compression); - memset(reader, 0, sizeof(*reader)); - return 0; - } else { /* all fine */ - reader->buffer_out = - grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, - decompressed_slices_buffer.count); - } - grpc_slice_buffer_destroy_internal(&decompressed_slices_buffer); - } else { /* not compressed, use the input buffer as output */ - reader->buffer_out = reader->buffer_in; - } + reader->buffer_out = reader->buffer_in; reader->current.index = 0; break; } - return 1; } void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) { - switch (reader->buffer_in->type) { - case GRPC_BB_RAW: - /* keeping the same if-else structure as in the init function */ - if (is_compressed(reader->buffer_in)) { - grpc_byte_buffer_destroy(reader->buffer_out); - } - break; - } + reader->buffer_out = nullptr; } int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader, diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 6988ab843b2..ecb605ad9bb 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -26,10 +26,15 @@ #include "src/core/lib/iomgr/closure.h" /** Internal bit flag for grpc_begin_message's \a flags signaling the use of - * compression for the message */ + * compression for the message. (Does not apply for stream compression.) */ #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) +/** Internal bit flag for determining whether the message was compressed and had + * to be decompressed by the message_decompress filter. (Does not apply for + * stream compression.) */ +#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u) /** Mask of all valid internal flags. */ -#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) +#define GRPC_WRITE_INTERNAL_USED_MASK \ + (GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED) namespace grpc_core { diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 6c2dfed4ae3..4b310cc2b9c 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -82,6 +82,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', + 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index bee0bfb41f2..858fd5676a1 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -73,13 +73,15 @@ int main(int argc, char** argv) { "authority", "connected", NULL); errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_SERVER_CHANNEL, "server", "connected", NULL); - errors += - CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL, - "authority", "http-client", "connected", NULL); + errors += CHECK_STACK("chttp2", &minimal_stack_args, + GRPC_CLIENT_DIRECT_CHANNEL, "authority", "http-client", + "message_decompress", "connected", NULL); errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL, - "authority", "http-client", "connected", NULL); - errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_SERVER_CHANNEL, - "server", "http-server", "connected", NULL); + "authority", "http-client", "message_decompress", + "connected", NULL); + errors += + CHECK_STACK("chttp2", &minimal_stack_args, GRPC_SERVER_CHANNEL, "server", + "http-server", "message_decompress", "connected", NULL); errors += CHECK_STACK(nullptr, &minimal_stack_args, GRPC_CLIENT_CHANNEL, "client-channel", NULL); @@ -91,15 +93,17 @@ int main(int argc, char** argv) { "message_size", "connected", NULL); errors += CHECK_STACK("unknown", nullptr, GRPC_SERVER_CHANNEL, "server", "message_size", "deadline", "connected", NULL); - errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, - "authority", "message_size", "deadline", "http-client", - "message_compress", "connected", NULL); + errors += + CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority", + "message_size", "deadline", "http-client", + "message_decompress", "message_compress", "connected", NULL); errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority", - "message_size", "http-client", "message_compress", - "connected", NULL); - errors += CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server", - "message_size", "deadline", "http-server", + "message_size", "http-client", "message_decompress", "message_compress", "connected", NULL); + errors += + CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server", + "message_size", "deadline", "http-server", + "message_decompress", "message_compress", "connected", NULL); errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel", NULL); diff --git a/test/core/end2end/cq_verifier.cc b/test/core/end2end/cq_verifier.cc index f7e64effcd4..3d45b6d647c 100644 --- a/test/core/end2end/cq_verifier.cc +++ b/test/core/end2end/cq_verifier.cc @@ -29,6 +29,8 @@ #include #include #include +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/compression/message_compress.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/surface/event_string.h" @@ -145,33 +147,25 @@ int raw_byte_buffer_eq_slice(grpc_byte_buffer* rbb, grpc_slice b) { } int byte_buffer_eq_slice(grpc_byte_buffer* bb, grpc_slice b) { - grpc_byte_buffer_reader reader; - grpc_byte_buffer* rbb; - int res; - - GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) && - "Couldn't init byte buffer reader"); - rbb = grpc_raw_byte_buffer_from_reader(&reader); - res = raw_byte_buffer_eq_slice(rbb, b); - grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(rbb); - - return res; + if (bb->data.raw.compression > GRPC_COMPRESS_NONE) { + grpc_slice_buffer decompressed_buffer; + grpc_slice_buffer_init(&decompressed_buffer); + GPR_ASSERT(grpc_msg_decompress( + grpc_compression_algorithm_to_message_compression_algorithm( + bb->data.raw.compression), + &bb->data.raw.slice_buffer, &decompressed_buffer)); + grpc_byte_buffer* rbb = grpc_raw_byte_buffer_create( + decompressed_buffer.slices, decompressed_buffer.count); + int ret_val = raw_byte_buffer_eq_slice(rbb, b); + grpc_byte_buffer_destroy(rbb); + grpc_slice_buffer_destroy(&decompressed_buffer); + return ret_val; + } + return raw_byte_buffer_eq_slice(bb, b); } int byte_buffer_eq_string(grpc_byte_buffer* bb, const char* str) { - grpc_byte_buffer_reader reader; - grpc_byte_buffer* rbb; - int res; - - GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) && - "Couldn't init byte buffer reader"); - rbb = grpc_raw_byte_buffer_from_reader(&reader); - res = raw_byte_buffer_eq_slice(rbb, grpc_slice_from_copied_string(str)); - grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(rbb); - - return res; + return byte_buffer_eq_slice(bb, grpc_slice_from_copied_string(str)); } static bool is_probably_integer(void* p) { return ((uintptr_t)p) < 1000000; } diff --git a/test/core/end2end/tests/compressed_payload.cc b/test/core/end2end/tests/compressed_payload.cc index 81e6bfccd10..5400a425219 100644 --- a/test/core/end2end/tests/compressed_payload.cc +++ b/test/core/end2end/tests/compressed_payload.cc @@ -41,9 +41,12 @@ static void* tag(intptr_t t) { return (void*)t; } static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, const char* test_name, grpc_channel_args* client_args, - grpc_channel_args* server_args) { + grpc_channel_args* server_args, + bool decompress_in_core) { grpc_end2end_test_fixture f; - gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + gpr_log(GPR_INFO, "Running test: %s%s/%s", test_name, + decompress_in_core ? "" : "_with_decompression_disabled", + config.name); f = config.create_fixture(client_args, server_args); config.init_server(&f, server_args); config.init_client(&f, client_args); @@ -97,7 +100,8 @@ static void request_for_disabled_algorithm( uint32_t send_flags_bitmask, grpc_compression_algorithm algorithm_to_disable, grpc_compression_algorithm requested_client_compression_algorithm, - grpc_status_code expected_error, grpc_metadata* client_metadata) { + grpc_status_code expected_error, grpc_metadata* client_metadata, + bool decompress_in_core) { grpc_call* c; grpc_call* s; grpc_slice request_payload_slice; @@ -128,13 +132,24 @@ static void request_for_disabled_algorithm( nullptr, requested_client_compression_algorithm); server_args = grpc_channel_args_set_channel_default_compression_algorithm( nullptr, GRPC_COMPRESS_NONE); - { - grpc_core::ExecCtx exec_ctx; - server_args = grpc_channel_args_compression_algorithm_set_state( - &server_args, algorithm_to_disable, false); + server_args = grpc_channel_args_compression_algorithm_set_state( + &server_args, algorithm_to_disable, false); + if (!decompress_in_core) { + grpc_arg disable_decompression_in_core_arg = + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0); + grpc_channel_args* old_client_args = client_args; + grpc_channel_args* old_server_args = server_args; + client_args = grpc_channel_args_copy_and_add( + client_args, &disable_decompression_in_core_arg, 1); + server_args = grpc_channel_args_copy_and_add( + server_args, &disable_decompression_in_core_arg, 1); + grpc_channel_args_destroy(old_client_args); + grpc_channel_args_destroy(old_server_args); } - f = begin_test(config, test_name, client_args, server_args); + f = begin_test(config, test_name, client_args, server_args, + decompress_in_core); cqv = cq_verifier_create(f.cq); gpr_timespec deadline = five_seconds_from_now(); @@ -253,18 +268,13 @@ static void request_for_disabled_algorithm( grpc_slice_unref(request_payload_slice); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(request_payload_recv); - - { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); - } - + grpc_channel_args_destroy(client_args); + grpc_channel_args_destroy(server_args); end_test(&f); config.tear_down_data(&f); } -static void request_with_payload_template( +static void request_with_payload_template_inner( grpc_end2end_test_config config, const char* test_name, uint32_t client_send_flags_bitmask, grpc_compression_algorithm default_client_channel_compression_algorithm, @@ -273,7 +283,7 @@ static void request_with_payload_template( grpc_compression_algorithm expected_algorithm_from_server, grpc_metadata* client_init_metadata, bool set_server_level, grpc_compression_level server_compression_level, - bool send_message_before_initial_metadata) { + bool send_message_before_initial_metadata, bool decompress_in_core) { grpc_call* c; grpc_call* s; grpc_slice request_payload_slice; @@ -312,8 +322,21 @@ static void request_with_payload_template( nullptr, default_client_channel_compression_algorithm); server_args = grpc_channel_args_set_channel_default_compression_algorithm( nullptr, default_server_channel_compression_algorithm); - - f = begin_test(config, test_name, client_args, server_args); + if (!decompress_in_core) { + grpc_arg disable_decompression_in_core_arg = + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0); + grpc_channel_args* old_client_args = client_args; + grpc_channel_args* old_server_args = server_args; + client_args = grpc_channel_args_copy_and_add( + client_args, &disable_decompression_in_core_arg, 1); + server_args = grpc_channel_args_copy_and_add( + server_args, &disable_decompression_in_core_arg, 1); + grpc_channel_args_destroy(old_client_args); + grpc_channel_args_destroy(old_server_args); + } + f = begin_test(config, test_name, client_args, server_args, + decompress_in_core); cqv = cq_verifier_create(f.cq); gpr_timespec deadline = five_seconds_from_now(); @@ -341,7 +364,6 @@ static void request_with_payload_template( GPR_ASSERT(GRPC_CALL_OK == error); CQ_EXPECT_COMPLETION(cqv, tag(2), true); } - memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -385,7 +407,6 @@ static void request_with_payload_template( GRPC_COMPRESS_DEFLATE) != 0); GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s), GRPC_COMPRESS_GZIP) != 0); - memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -406,7 +427,6 @@ static void request_with_payload_template( error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(101), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); - for (int i = 0; i < 2; i++) { response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); @@ -442,7 +462,8 @@ static void request_with_payload_template( GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW); GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, request_str)); GPR_ASSERT(request_payload_recv->data.raw.compression == - expected_algorithm_from_client); + (decompress_in_core ? GRPC_COMPRESS_NONE + : expected_algorithm_from_client)); memset(ops, 0, sizeof(ops)); op = ops; @@ -475,11 +496,13 @@ static void request_with_payload_template( if (server_compression_level > GRPC_COMPRESS_LEVEL_NONE) { const grpc_compression_algorithm algo_for_server_level = grpc_call_compression_for_level(s, server_compression_level); - GPR_ASSERT(response_payload_recv->data.raw.compression == - algo_for_server_level); + GPR_ASSERT( + response_payload_recv->data.raw.compression == + (decompress_in_core ? GRPC_COMPRESS_NONE : algo_for_server_level)); } else { GPR_ASSERT(response_payload_recv->data.raw.compression == - expected_algorithm_from_server); + (decompress_in_core ? GRPC_COMPRESS_NONE + : expected_algorithm_from_server)); } grpc_byte_buffer_destroy(request_payload); @@ -487,7 +510,6 @@ static void request_with_payload_template( grpc_byte_buffer_destroy(request_payload_recv); grpc_byte_buffer_destroy(response_payload_recv); } - grpc_slice_unref(request_payload_slice); grpc_slice_unref(response_payload_slice); @@ -536,17 +558,38 @@ static void request_with_payload_template( grpc_call_unref(s); cq_verifier_destroy(cqv); - - { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); - } - + grpc_channel_args_destroy(client_args); + grpc_channel_args_destroy(server_args); end_test(&f); config.tear_down_data(&f); } +static void request_with_payload_template( + grpc_end2end_test_config config, const char* test_name, + uint32_t client_send_flags_bitmask, + grpc_compression_algorithm default_client_channel_compression_algorithm, + grpc_compression_algorithm default_server_channel_compression_algorithm, + grpc_compression_algorithm expected_algorithm_from_client, + grpc_compression_algorithm expected_algorithm_from_server, + grpc_metadata* client_init_metadata, bool set_server_level, + grpc_compression_level server_compression_level, + bool send_message_before_initial_metadata) { + request_with_payload_template_inner( + config, test_name, client_send_flags_bitmask, + default_client_channel_compression_algorithm, + default_server_channel_compression_algorithm, + expected_algorithm_from_client, expected_algorithm_from_server, + client_init_metadata, set_server_level, server_compression_level, + send_message_before_initial_metadata, false); + request_with_payload_template_inner( + config, test_name, client_send_flags_bitmask, + default_client_channel_compression_algorithm, + default_server_channel_compression_algorithm, + expected_algorithm_from_client, expected_algorithm_from_server, + client_init_metadata, set_server_level, server_compression_level, + send_message_before_initial_metadata, true); +} + static void test_invoke_request_with_exceptionally_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( @@ -634,7 +677,11 @@ static void test_invoke_request_with_disabled_algorithm( request_for_disabled_algorithm(config, "test_invoke_request_with_disabled_algorithm", 0, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, - GRPC_STATUS_UNIMPLEMENTED, nullptr); + GRPC_STATUS_UNIMPLEMENTED, nullptr, false); + request_for_disabled_algorithm(config, + "test_invoke_request_with_disabled_algorithm", + 0, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, + GRPC_STATUS_UNIMPLEMENTED, nullptr, true); } void compressed_payload(grpc_end2end_test_config config) { diff --git a/test/core/end2end/tests/workaround_cronet_compression.cc b/test/core/end2end/tests/workaround_cronet_compression.cc index 1a47244b68d..5ffb477aebc 100644 --- a/test/core/end2end/tests/workaround_cronet_compression.cc +++ b/test/core/end2end/tests/workaround_cronet_compression.cc @@ -100,8 +100,8 @@ static void request_with_payload_template( grpc_compression_algorithm expected_algorithm_from_client, grpc_compression_algorithm expected_algorithm_from_server, grpc_metadata* client_init_metadata, bool set_server_level, - grpc_compression_level server_compression_level, - char* user_agent_override) { + grpc_compression_level server_compression_level, char* user_agent_override, + bool decompress_in_core) { grpc_call* c; grpc_call* s; grpc_slice request_payload_slice; @@ -140,9 +140,21 @@ static void request_with_payload_template( nullptr, default_client_channel_compression_algorithm); server_args = grpc_channel_args_set_channel_default_compression_algorithm( nullptr, default_server_channel_compression_algorithm); + if (!decompress_in_core) { + grpc_arg disable_decompression_in_core_arg = + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0); + grpc_channel_args* old_client_args = client_args; + grpc_channel_args* old_server_args = server_args; + client_args = grpc_channel_args_copy_and_add( + client_args, &disable_decompression_in_core_arg, 1); + server_args = grpc_channel_args_copy_and_add( + server_args, &disable_decompression_in_core_arg, 1); + grpc_channel_args_destroy(old_client_args); + grpc_channel_args_destroy(old_server_args); + } if (user_agent_override) { - grpc_core::ExecCtx exec_ctx; grpc_channel_args* client_args_old = client_args; grpc_arg arg; arg.key = const_cast(GRPC_ARG_PRIMARY_USER_AGENT_STRING); @@ -267,7 +279,8 @@ static void request_with_payload_template( GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW); GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, request_str)); GPR_ASSERT(request_payload_recv->data.raw.compression == - expected_algorithm_from_client); + (decompress_in_core ? GRPC_COMPRESS_NONE + : expected_algorithm_from_client)); memset(ops, 0, sizeof(ops)); op = ops; @@ -288,11 +301,13 @@ static void request_with_payload_template( if (server_compression_level > GRPC_COMPRESS_LEVEL_NONE) { const grpc_compression_algorithm algo_for_server_level = grpc_call_compression_for_level(s, server_compression_level); - GPR_ASSERT(response_payload_recv->data.raw.compression == - algo_for_server_level); + GPR_ASSERT( + response_payload_recv->data.raw.compression == + (decompress_in_core ? GRPC_COMPRESS_NONE : algo_for_server_level)); } else { GPR_ASSERT(response_payload_recv->data.raw.compression == - expected_algorithm_from_server); + (decompress_in_core ? GRPC_COMPRESS_NONE + : expected_algorithm_from_server)); } grpc_byte_buffer_destroy(request_payload); @@ -349,13 +364,8 @@ static void request_with_payload_template( grpc_call_unref(s); cq_verifier_destroy(cqv); - - { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); - } - + grpc_channel_args_destroy(client_args); + grpc_channel_args_destroy(server_args); end_test(&f); config.tear_down_data(&f); } @@ -387,7 +397,14 @@ static void test_workaround_cronet_compression( GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, workaround_configs[i].expected_algorithm_from_server, nullptr, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE, - workaround_configs[i].user_agent_override); + workaround_configs[i].user_agent_override, true); + request_with_payload_template( + config, + "test_invoke_request_with_compressed_payload_with_compression_disabled", + 0, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, + workaround_configs[i].expected_algorithm_from_server, nullptr, false, + /* ignored */ GRPC_COMPRESS_LEVEL_NONE, + workaround_configs[i].user_agent_override, false); } } diff --git a/test/core/surface/byte_buffer_reader_test.cc b/test/core/surface/byte_buffer_reader_test.cc index fc2654426d7..47d76f49277 100644 --- a/test/core/surface/byte_buffer_reader_test.cc +++ b/test/core/surface/byte_buffer_reader_test.cc @@ -25,7 +25,6 @@ #include #include -#include "src/core/lib/compression/message_compress.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "test/core/util/test_config.h" @@ -168,75 +167,6 @@ static void test_peek_none_compressed_slice(void) { grpc_byte_buffer_destroy(buffer); } -static void test_read_corrupted_slice(void) { - grpc_slice slice; - grpc_byte_buffer* buffer; - grpc_byte_buffer_reader reader; - - LOG_TEST("test_read_corrupted_slice"); - slice = grpc_slice_from_copied_string("test"); - buffer = grpc_raw_byte_buffer_create(&slice, 1); - buffer->data.raw.compression = GRPC_COMPRESS_GZIP; /* lies! */ - grpc_slice_unref(slice); - GPR_ASSERT(!grpc_byte_buffer_reader_init(&reader, buffer)); - grpc_byte_buffer_destroy(buffer); -} - -static void read_compressed_slice(grpc_compression_algorithm algorithm, - size_t input_size) { - grpc_slice input_slice; - grpc_slice_buffer sliceb_in; - grpc_slice_buffer sliceb_out; - grpc_byte_buffer* buffer; - grpc_byte_buffer_reader reader; - grpc_slice read_slice; - size_t read_count = 0; - - grpc_slice_buffer_init(&sliceb_in); - grpc_slice_buffer_init(&sliceb_out); - - input_slice = grpc_slice_malloc(input_size); - memset(GRPC_SLICE_START_PTR(input_slice), 'a', input_size); - grpc_slice_buffer_add(&sliceb_in, input_slice); /* takes ownership */ - { - grpc_core::ExecCtx exec_ctx; - GPR_ASSERT(grpc_msg_compress( - - grpc_compression_algorithm_to_message_compression_algorithm(algorithm), - &sliceb_in, &sliceb_out)); - } - - buffer = grpc_raw_compressed_byte_buffer_create(sliceb_out.slices, - sliceb_out.count, algorithm); - GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) && - "Couldn't init byte buffer reader"); - - while (grpc_byte_buffer_reader_next(&reader, &read_slice)) { - GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(read_slice), - GRPC_SLICE_START_PTR(input_slice) + read_count, - GRPC_SLICE_LENGTH(read_slice)) == 0); - read_count += GRPC_SLICE_LENGTH(read_slice); - grpc_slice_unref(read_slice); - } - GPR_ASSERT(read_count == input_size); - grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(buffer); - grpc_slice_buffer_destroy(&sliceb_out); - grpc_slice_buffer_destroy(&sliceb_in); -} - -static void test_read_gzip_compressed_slice(void) { - const size_t INPUT_SIZE = 2048; - LOG_TEST("test_read_gzip_compressed_slice"); - read_compressed_slice(GRPC_COMPRESS_GZIP, INPUT_SIZE); -} - -static void test_read_deflate_compressed_slice(void) { - const size_t INPUT_SIZE = 2048; - LOG_TEST("test_read_deflate_compressed_slice"); - read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE); -} - static void test_byte_buffer_from_reader(void) { grpc_slice slice; grpc_byte_buffer *buffer, *buffer_from_reader; @@ -342,9 +272,6 @@ int main(int argc, char** argv) { test_peek_one_slice(); test_peek_one_slice_malloc(); test_peek_none_compressed_slice(); - test_read_gzip_compressed_slice(); - test_read_deflate_compressed_slice(); - test_read_corrupted_slice(); test_byte_buffer_from_reader(); test_byte_buffer_copy(); test_readall(); diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h index e0bb6eacd90..e97274177ab 100644 --- a/test/cpp/interop/client_helper.h +++ b/test/cpp/interop/client_helper.h @@ -27,6 +27,7 @@ #include #include "src/core/lib/surface/call_test_only.h" +#include "src/core/lib/transport/byte_stream.h" namespace grpc { namespace testing { @@ -54,8 +55,11 @@ class InteropClientContextInspector { return grpc_call_test_only_get_compression_algorithm(context_.call_); } - uint32_t GetMessageFlags() const { - return grpc_call_test_only_get_message_flags(context_.call_); + bool WasCompressed() const { + return (grpc_call_test_only_get_message_flags(context_.call_) & + GRPC_WRITE_INTERNAL_COMPRESS) || + (grpc_call_test_only_get_message_flags(context_.call_) & + GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED); } private: diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index b05e1de82ef..71ee79a6e96 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -30,7 +30,6 @@ #include #include -#include "src/core/lib/transport/byte_stream.h" #include "src/proto/grpc/testing/empty.pb.h" #include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" @@ -67,10 +66,10 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector, "from server."); abort(); } - GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + GPR_ASSERT(inspector.WasCompressed()); } else { // Didn't request compression -> make sure the response is uncompressed - GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + GPR_ASSERT(!(inspector.WasCompressed())); } } } // namespace @@ -577,10 +576,10 @@ bool InteropClient::DoServerCompressedStreaming() { GPR_ASSERT(request.response_parameters(k).has_compressed()); if (request.response_parameters(k).compressed().value()) { GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE); - GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + GPR_ASSERT(inspector.WasCompressed()); } else { // requested *no* compression. - GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); + GPR_ASSERT(!(inspector.WasCompressed())); } ++k; } diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index 5482e4f6759..ca3a03883df 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -31,7 +31,6 @@ #include #include "src/core/lib/gpr/string.h" -#include "src/core/lib/transport/byte_stream.h" #include "src/proto/grpc/testing/empty.pb.h" #include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" @@ -118,7 +117,7 @@ bool CheckExpectedCompression(const ServerContext& context, "Expected compression but got uncompressed request from client."); return false; } - if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) { + if (!(inspector.WasCompressed())) { gpr_log(GPR_ERROR, "Failure: Requested compression in a compressable request, but " "compression bit in message flags not set."); @@ -126,7 +125,7 @@ bool CheckExpectedCompression(const ServerContext& context, } } else { // Didn't expect compression -> make sure the request is uncompressed - if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) { + if (inspector.WasCompressed()) { gpr_log(GPR_ERROR, "Failure: Didn't requested compression, but compression bit in " "message flags set."); diff --git a/test/cpp/interop/server_helper.cc b/test/cpp/interop/server_helper.cc index 21945219982..6a3637f6bfa 100644 --- a/test/cpp/interop/server_helper.cc +++ b/test/cpp/interop/server_helper.cc @@ -24,6 +24,7 @@ #include #include "src/core/lib/surface/call_test_only.h" +#include "src/core/lib/transport/byte_stream.h" #include "test/cpp/util/test_credentials_provider.h" DECLARE_bool(use_alts); @@ -60,8 +61,11 @@ uint32_t InteropServerContextInspector::GetEncodingsAcceptedByClient() const { return grpc_call_test_only_get_encodings_accepted_by_peer(context_.call_); } -uint32_t InteropServerContextInspector::GetMessageFlags() const { - return grpc_call_test_only_get_message_flags(context_.call_); +bool InteropServerContextInspector::WasCompressed() const { + return (grpc_call_test_only_get_message_flags(context_.call_) & + GRPC_WRITE_INTERNAL_COMPRESS) || + (grpc_call_test_only_get_message_flags(context_.call_) & + GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED); } std::shared_ptr diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 1bfbf8e474d..237c33cd0cd 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -44,7 +44,7 @@ class InteropServerContextInspector { bool IsCancelled() const; grpc_compression_algorithm GetCallCompressionAlgorithm() const; uint32_t GetEncodingsAcceptedByClient() const; - uint32_t GetMessageFlags() const; + bool WasCompressed() const; private: const ::grpc::ServerContext& context_; diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 60e99d7b5a0..a9d06ac2015 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1186,6 +1186,8 @@ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.h \ +src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ +src/core/ext/filters/http/message_compress/message_decompress_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/max_age/max_age_filter.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fef1fd11d4a..ef9d14d18b7 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -986,6 +986,8 @@ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.h \ +src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ +src/core/ext/filters/http/message_compress/message_decompress_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/max_age/max_age_filter.cc \