From 3c34eae97e00a7db0b9e9e1505bec4cbcda4bd02 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 2 Apr 2020 18:54:18 -0700 Subject: [PATCH] Move decompression into gRPC Core --- BUILD | 2 + BUILD.gn | 2 + CMakeLists.txt | 2 + Makefile | 2 + build_autogenerated.yaml | 4 + config.m4 | 2 + config.w32 | 2 + gRPC-C++.podspec | 2 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 2 + include/grpc/impl/codegen/grpc_types.h | 8 + package.xml | 2 + .../ext/filters/http/http_filters_plugin.cc | 13 + .../message_decompress_filter.cc | 339 ++++++++++++++++++ .../message_decompress_filter.h | 29 ++ src/core/lib/surface/call.cc | 7 + src/core/lib/surface/call.h | 4 + src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + 21 files changed, 432 insertions(+) create mode 100644 src/core/ext/filters/http/message_decompress/message_decompress_filter.cc create mode 100644 src/core/ext/filters/http/message_decompress/message_decompress_filter.h diff --git a/BUILD b/BUILD index 81de8203a03..00cbedae6e7 100644 --- a/BUILD +++ b/BUILD @@ -1193,11 +1193,13 @@ grpc_cc_library( "src/core/ext/filters/http/client/http_client_filter.cc", "src/core/ext/filters/http/http_filters_plugin.cc", "src/core/ext/filters/http/message_compress/message_compress_filter.cc", + "src/core/ext/filters/http/message_decompress/message_decompress_filter.cc", "src/core/ext/filters/http/server/http_server_filter.cc", ], hdrs = [ "src/core/ext/filters/http/client/http_client_filter.h", "src/core/ext/filters/http/message_compress/message_compress_filter.h", + "src/core/ext/filters/http/message_decompress/message_decompress_filter.h", "src/core/ext/filters/http/server/http_server_filter.h", ], language = "c++", diff --git a/BUILD.gn b/BUILD.gn index 129ad2b2639..94ba02bed75 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -318,6 +318,8 @@ config("grpc_config") { "src/core/ext/filters/http/http_filters_plugin.cc", "src/core/ext/filters/http/message_compress/message_compress_filter.cc", "src/core/ext/filters/http/message_compress/message_compress_filter.h", + "src/core/ext/filters/http/message_decompress/message_decompress_filter.cc", + "src/core/ext/filters/http/message_decompress/message_decompress_filter.h", "src/core/ext/filters/http/server/http_server_filter.cc", "src/core/ext/filters/http/server/http_server_filter.h", "src/core/ext/filters/max_age/max_age_filter.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 27bf340b65a..bcd98a9dd73 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1369,6 +1369,7 @@ add_library(grpc src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc src/core/ext/filters/http/message_compress/message_compress_filter.cc + src/core/ext/filters/http/message_decompress/message_decompress_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/message_size/message_size_filter.cc @@ -2028,6 +2029,7 @@ add_library(grpc_unsecure src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc src/core/ext/filters/http/message_compress/message_compress_filter.cc + src/core/ext/filters/http/message_decompress/message_decompress_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/message_size/message_size_filter.cc diff --git a/Makefile b/Makefile index d6c8cd19efa..b99620e7207 100644 --- a/Makefile +++ b/Makefile @@ -3694,6 +3694,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ + src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ @@ -4327,6 +4328,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ + src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 30dba593d4f..e7fc808d798 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -423,6 +423,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - src/core/ext/filters/http/message_compress/message_compress_filter.h + - src/core/ext/filters/http/message_decompress/message_decompress_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/max_age/max_age_filter.h - src/core/ext/filters/message_size/message_size_filter.h @@ -795,6 +796,7 @@ libs: - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/message_compress_filter.cc + - src/core/ext/filters/http/message_decompress/message_decompress_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/max_age/max_age_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc @@ -1325,6 +1327,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - src/core/ext/filters/http/message_compress/message_compress_filter.h + - src/core/ext/filters/http/message_decompress/message_decompress_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/max_age/max_age_filter.h - src/core/ext/filters/message_size/message_size_filter.h @@ -1632,6 +1635,7 @@ libs: - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/message_compress_filter.cc + - src/core/ext/filters/http/message_decompress/message_decompress_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/max_age/max_age_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc diff --git a/config.m4 b/config.m4 index 6b450315f4f..4a252459015 100644 --- a/config.m4 +++ b/config.m4 @@ -104,6 +104,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ + src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ @@ -841,6 +842,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/client) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/message_compress) + PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/message_decompress) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/server) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/max_age) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/message_size) diff --git a/config.w32 b/config.w32 index 59cdbdb1ce4..afdf09c9c39 100644 --- a/config.w32 +++ b/config.w32 @@ -73,6 +73,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\http\\client_authority_filter.cc " + "src\\core\\ext\\filters\\http\\http_filters_plugin.cc " + "src\\core\\ext\\filters\\http\\message_compress\\message_compress_filter.cc " + + "src\\core\\ext\\filters\\http\\message_decompress\\message_decompress_filter.cc " + "src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " + "src\\core\\ext\\filters\\max_age\\max_age_filter.cc " + "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " + @@ -842,6 +843,7 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\client"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\message_compress"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\message_decompress"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\server"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\max_age"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\message_size"); diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 77980999b59..bda48ecdd6d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -274,6 +274,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', @@ -725,6 +726,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 749e77b3fa7..b75f502128d 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -301,6 +301,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.cc', @@ -1078,6 +1080,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/message_compress_filter.h', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/max_age/max_age_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', diff --git a/grpc.gemspec b/grpc.gemspec index 29e5d3d436b..e902a7c29fc 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -223,6 +223,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc ) s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.cc ) s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.h ) + s.files += %w( src/core/ext/filters/http/message_decompress/message_decompress_filter.cc ) + s.files += %w( src/core/ext/filters/http/message_decompress/message_decompress_filter.h ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.h ) s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc ) diff --git a/grpc.gyp b/grpc.gyp index a62c034d65a..458eae5e114 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -497,6 +497,7 @@ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', @@ -992,6 +993,7 @@ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index ff45450f3a3..c1624ad9b7c 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -174,6 +174,14 @@ typedef struct { /** Enable/disable support for per-message compression. Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */ #define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression" +/** Experimental Arg. Enable/disable support for per-message decompression. + Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it + defaults to 0. If disabled, decompression will be performed lazily by + grpc_byte_buffer_reader. This arg also determines whether max message limits + will be applied to the decompressed buffer or the non-decompressed buffer. It + is recommended to keep this enabled to protect against zip bomb attacks. */ +#define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION \ + "grpc.per_message_decompression" /** Enable/disable support for deadline checking. Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0 */ #define GRPC_ARG_ENABLE_DEADLINE_CHECKS "grpc.enable_deadline_checking" diff --git a/package.xml b/package.xml index 06d1669ec48..5391f709cd3 100644 --- a/package.xml +++ b/package.xml @@ -203,6 +203,8 @@ + + diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index f03fa0141df..59749d54546 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -22,6 +22,7 @@ #include "src/core/ext/filters/http/client/http_client_filter.h" #include "src/core/ext/filters/http/message_compress/message_compress_filter.h" +#include "src/core/ext/filters/http/message_decompress/message_decompress_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/surface/call.h" @@ -36,6 +37,9 @@ typedef struct { static optional_filter compress_filter = { &grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION}; +static optional_filter decompress_filter = { + &grpc_message_decompress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION}; + static bool is_building_http_like_transport( grpc_channel_stack_builder* builder) { grpc_transport* t = grpc_channel_stack_builder_get_transport(builder); @@ -75,6 +79,15 @@ void grpc_http_filters_init(void) { grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_optional_filter, &compress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_optional_filter, &decompress_filter); grpc_channel_init_register_stage( GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_required_filter, (void*)&grpc_http_client_filter); diff --git a/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc b/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc new file mode 100644 index 00000000000..c6a5f8d2847 --- /dev/null +++ b/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc @@ -0,0 +1,339 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include +#include + +#include +#include +#include +#include + +#include "src/core/ext/filters/http/message_decompress/message_decompress_filter.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/compression/algorithm_metadata.h" +#include "src/core/lib/compression/compression_args.h" +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/transport/static_metadata.h" + +namespace { + +class ChannelData {}; + +class CallData { + public: + CallData(const grpc_call_element_args& args) + : call_combiner_(args.call_combiner) { + // Initialize state for recv_initial_metadata_ready callback + GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_, + OnRecvInitialMetadataReady, this, + grpc_schedule_on_exec_ctx); + // Initialize state for recv_message_ready callback + grpc_slice_buffer_init(&recv_slices_); + GRPC_CLOSURE_INIT(&on_recv_message_next_done_, OnRecvMessageNextDone, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_recv_message_ready_, OnRecvMessageReady, this, + grpc_schedule_on_exec_ctx); + // Initialize state for recv_trailing_metadata_ready callback + GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_, + OnRecvTrailingMetadataReady, this, + grpc_schedule_on_exec_ctx); + } + + static void DecompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch); + + static void OnRecvInitialMetadataReady(void* arg, grpc_error* error); + + // Methods for processing a receive message event + void MaybeResumeOnRecvMessageReady(); + static void OnRecvMessageReady(void* arg, grpc_error* error); + static void OnRecvMessageNextDone(void* arg, grpc_error* error); + grpc_error* PullSliceFromRecvMessage(); + void ContinueReadingRecvMessage(); + void FinishRecvMessage(); + void ContinueRecvMessageReadyCallback(grpc_error* error); + + // Methods for processing a recv_trailing_metadata event + void MaybeResumeOnRecvTrailingMetadataReady(); + static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error); + + private: + grpc_core::CallCombiner* call_combiner_ = nullptr; + // Overall error for the call + grpc_error* error_ = GRPC_ERROR_NONE; + // Fields for handling recv_initial_metadata_ready callback + grpc_closure on_recv_initial_metadata_ready_; + grpc_closure* original_recv_initial_metadata_ready_ = nullptr; + grpc_metadata_batch* recv_initial_metadata_ = nullptr; + // Fields for handling recv_message_ready callback + bool seen_recv_message_ready_ = false; + grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE; + grpc_closure on_recv_message_ready_; + grpc_closure* original_recv_message_ready_ = nullptr; + grpc_closure on_recv_message_next_done_; + grpc_core::OrphanablePtr* recv_message_ = nullptr; + // recv_slices_ holds the slices read from the original recv_message stream. + // It is initialized during construction and reset when a new stream is + // created using it. + grpc_slice_buffer recv_slices_; + grpc_core::ManualConstructor + recv_replacement_stream_; + // Fields for handling recv_trailing_metadata_ready callback + bool seen_recv_trailing_metadata_ready_ = false; + grpc_closure on_recv_trailing_metadata_ready_; + grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; + grpc_error* on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE; +}; + +grpc_message_compression_algorithm DecodeMessageCompressionAlgorithm( + grpc_mdelem md) { + grpc_message_compression_algorithm algorithm = + grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md)); + if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) { + char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log(GPR_ERROR, + "Invalid incoming message compression algorithm: '%s'. " + "Interpreting incoming data as uncompressed.", + md_c_str); + gpr_free(md_c_str); + return GRPC_MESSAGE_COMPRESS_NONE; + } + return algorithm; +} + +void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (error == GRPC_ERROR_NONE) { + grpc_linked_mdelem* grpc_encoding = + calld->recv_initial_metadata_->idx.named.grpc_encoding; + if (grpc_encoding != nullptr) { + calld->algorithm_ = DecodeMessageCompressionAlgorithm(grpc_encoding->md); + grpc_metadata_batch_remove(calld->recv_initial_metadata_, + GRPC_BATCH_GRPC_ENCODING); + } + } + calld->MaybeResumeOnRecvMessageReady(); + calld->MaybeResumeOnRecvTrailingMetadataReady(); + grpc_closure* closure = calld->original_recv_initial_metadata_ready_; + calld->original_recv_initial_metadata_ready_ = nullptr; + grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error)); +} + +void CallData::MaybeResumeOnRecvMessageReady() { + if (seen_recv_message_ready_) { + seen_recv_message_ready_ = false; + GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_message_ready_, + GRPC_ERROR_NONE, + "continue recv_message_ready callback"); + } +} + +void CallData::OnRecvMessageReady(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (error == GRPC_ERROR_NONE && + calld->algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) { + // recv_message can be NULL if trailing metadata is received instead of + // message. + if (*calld->recv_message_ == nullptr || + (*calld->recv_message_)->length() == 0) { + calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE); + return; + } + if (calld->original_recv_initial_metadata_ready_ != nullptr) { + calld->seen_recv_message_ready_ = true; + GRPC_CALL_COMBINER_STOP(calld->call_combiner_, + "Deferring OnRecvMessageReady until after " + "OnRecvInitialMetadataReady"); + return; + } + calld->ContinueReadingRecvMessage(); + } else { + calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error)); + } +} + +void CallData::ContinueReadingRecvMessage() { + while ((*recv_message_) + ->Next(~static_cast(0), &on_recv_message_next_done_)) { + grpc_error* error = PullSliceFromRecvMessage(); + if (error != GRPC_ERROR_NONE) { + return ContinueRecvMessageReadyCallback(error); + } + // We have read the entire message. + if (recv_slices_.length == (*recv_message_)->length()) { + return FinishRecvMessage(); + } + } +} + +grpc_error* CallData::PullSliceFromRecvMessage() { + grpc_slice incoming_slice; + grpc_error* error = (*recv_message_)->Pull(&incoming_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&recv_slices_, incoming_slice); + } + return error; +} + +void CallData::OnRecvMessageNextDone(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (error != GRPC_ERROR_NONE) { + return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error)); + } + error = calld->PullSliceFromRecvMessage(); + if (error != GRPC_ERROR_NONE) { + return calld->ContinueRecvMessageReadyCallback(error); + } + if (calld->recv_slices_.length == (*calld->recv_message_)->length()) { + calld->FinishRecvMessage(); + } else { + calld->ContinueReadingRecvMessage(); + } +} + +void CallData::FinishRecvMessage() { + grpc_slice_buffer decompressed_slices; + if (grpc_msg_decompress(algorithm_, &recv_slices_, &decompressed_slices) == + 0) { + gpr_log( + GPR_ERROR, + "Unexpected error decompressing data for algorithm with enum value %d", + algorithm_); + } + uint32_t recv_flags = (*recv_message_)->flags(); + // Swap out the original receive byte stream with our new one and send the + // batch down. + recv_replacement_stream_.Init(&recv_slices_, recv_flags); + recv_message_->reset(recv_replacement_stream_.get()); + recv_message_ = nullptr; + ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE); +} + +void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) { + MaybeResumeOnRecvTrailingMetadataReady(); + // The surface will clean up the receiving stream if there is an error. + grpc_closure* closure = original_recv_message_ready_; + original_recv_message_ready_ = nullptr; + grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); +} + +void CallData::MaybeResumeOnRecvTrailingMetadataReady() { + if (seen_recv_trailing_metadata_ready_) { + seen_recv_trailing_metadata_ready_ = false; + grpc_error* error = on_recv_trailing_metadata_ready_error_; + on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE; + GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_trailing_metadata_ready_, + error, "Continuing OnRecvTrailingMetadataReady"); + } +} + +void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) { + CallData* calld = static_cast(arg); + if (calld->original_recv_initial_metadata_ready_ != nullptr || + calld->original_recv_message_ready_ != nullptr) { + calld->seen_recv_trailing_metadata_ready_ = true; + calld->on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_REF(error); + GRPC_CALL_COMBINER_STOP( + calld->call_combiner_, + "Deferring OnRecvTrailingMetadataReady until after " + "OnRecvInitialMetadataReady and OnRecvMessageReady"); + return; + } + error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error_); + calld->error_ = GRPC_ERROR_NONE; + grpc_closure* closure = calld->original_recv_trailing_metadata_ready_; + calld->original_recv_trailing_metadata_ready_ = nullptr; + grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); +} + +void CallData::DecompressStartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0); + CallData* calld = static_cast(elem->call_data); + // Handle recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->recv_initial_metadata_ = + batch->payload->recv_initial_metadata.recv_initial_metadata; + calld->original_recv_initial_metadata_ready_ = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->on_recv_initial_metadata_ready_; + } + // Handle recv_message + if (batch->recv_message) { + calld->recv_message_ = batch->payload->recv_message.recv_message; + calld->original_recv_message_ready_ = + batch->payload->recv_message.recv_message_ready; + batch->payload->recv_message.recv_message_ready = + &calld->on_recv_message_ready_; + } + // Handle recv_trailing_metadata + if (batch->recv_trailing_metadata) { + calld->original_recv_trailing_metadata_ready_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &calld->on_recv_trailing_metadata_ready_; + } + // Pass control down the stack. + grpc_call_next_op(elem, batch); +} + +static grpc_error* DecompressInitCallElem(grpc_call_element* elem, + const grpc_call_element_args* args) { + new (elem->call_data) CallData(*args); + return GRPC_ERROR_NONE; +} + +static void DecompressDestroyCallElem( + grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { + CallData* calld = static_cast(elem->call_data); + calld->~CallData(); +} + +static grpc_error* DecompressInitChannelElem( + grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) { + return GRPC_ERROR_NONE; +} + +void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) { return; } + +} // namespace + +const grpc_channel_filter grpc_message_decompress_filter = { + CallData::DecompressStartTransportStreamOpBatch, + grpc_channel_next_op, + sizeof(CallData), + DecompressInitCallElem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + DecompressDestroyCallElem, + 0, // sizeof(ChannelData) + DecompressInitChannelElem, + DecompressDestroyChannelElem, + grpc_channel_next_get_info, + "message_decompress"}; diff --git a/src/core/ext/filters/http/message_decompress/message_decompress_filter.h b/src/core/ext/filters/http/message_decompress/message_decompress_filter.h new file mode 100644 index 00000000000..5e6b01d5cbb --- /dev/null +++ b/src/core/ext/filters/http/message_decompress/message_decompress_filter.h @@ -0,0 +1,29 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H +#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H + +#include + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_message_decompress_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H \ + */ diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index fcebe9bc410..339b2438c03 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -200,6 +200,8 @@ struct grpc_call { /* Stream compression algorithm for *incoming* data */ grpc_stream_compression_algorithm incoming_stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; + /* Maximum size for uncompressed receive message in bytes. -1 for unlimited */ + int max_uncompressed_receive_message_length = -1; /* Supported encodings (compression algorithms), a bitset. * Always support no compression. */ uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE; @@ -497,6 +499,11 @@ void grpc_call_set_completion_queue(grpc_call* call, &call->pollent); } +void grpc_call_set_max_uncompressed_receive_message_length(grpc_call* call, + int limit) { + call->max_uncompressed_receive_message_length = limit; +} + #ifndef NDEBUG #define REF_REASON reason #define REF_ARG , const char* reason diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index a33664af6a9..351e57d75cb 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -59,6 +59,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq); +/* Sets the max uncompressed receive message length for the call. */ +void grpc_call_set_max_uncompressed_receive_message_length(grpc_call* call, + int limit); + #ifndef NDEBUG void grpc_call_internal_ref(grpc_call* call, const char* reason); void grpc_call_internal_unref(grpc_call* call, const char* reason); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 6c2dfed4ae3..607eb74c354 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -82,6 +82,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', + 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 60e99d7b5a0..3e9f42468a0 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1186,6 +1186,8 @@ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.h \ +src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \ +src/core/ext/filters/http/message_decompress/message_decompress_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/max_age/max_age_filter.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index fef1fd11d4a..8b6f9a7cb3c 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -986,6 +986,8 @@ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.cc \ src/core/ext/filters/http/message_compress/message_compress_filter.h \ +src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \ +src/core/ext/filters/http/message_decompress/message_decompress_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/max_age/max_age_filter.cc \