diff --git a/BUILD b/BUILD
index 81de8203a03..00cbedae6e7 100644
--- a/BUILD
+++ b/BUILD
@@ -1193,11 +1193,13 @@ grpc_cc_library(
"src/core/ext/filters/http/client/http_client_filter.cc",
"src/core/ext/filters/http/http_filters_plugin.cc",
"src/core/ext/filters/http/message_compress/message_compress_filter.cc",
+ "src/core/ext/filters/http/message_decompress/message_decompress_filter.cc",
"src/core/ext/filters/http/server/http_server_filter.cc",
],
hdrs = [
"src/core/ext/filters/http/client/http_client_filter.h",
"src/core/ext/filters/http/message_compress/message_compress_filter.h",
+ "src/core/ext/filters/http/message_decompress/message_decompress_filter.h",
"src/core/ext/filters/http/server/http_server_filter.h",
],
language = "c++",
diff --git a/BUILD.gn b/BUILD.gn
index 129ad2b2639..94ba02bed75 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -318,6 +318,8 @@ config("grpc_config") {
"src/core/ext/filters/http/http_filters_plugin.cc",
"src/core/ext/filters/http/message_compress/message_compress_filter.cc",
"src/core/ext/filters/http/message_compress/message_compress_filter.h",
+ "src/core/ext/filters/http/message_decompress/message_decompress_filter.cc",
+ "src/core/ext/filters/http/message_decompress/message_decompress_filter.h",
"src/core/ext/filters/http/server/http_server_filter.cc",
"src/core/ext/filters/http/server/http_server_filter.h",
"src/core/ext/filters/max_age/max_age_filter.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 27bf340b65a..bcd98a9dd73 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1369,6 +1369,7 @@ add_library(grpc
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
+ src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/max_age/max_age_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
@@ -2028,6 +2029,7 @@ add_library(grpc_unsecure
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
+ src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/max_age/max_age_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
diff --git a/Makefile b/Makefile
index d6c8cd19efa..b99620e7207 100644
--- a/Makefile
+++ b/Makefile
@@ -3694,6 +3694,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
+ src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
@@ -4327,6 +4328,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
+ src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 30dba593d4f..e7fc808d798 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -423,6 +423,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/message_compress_filter.h
+ - src/core/ext/filters/http/message_decompress/message_decompress_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/max_age/max_age_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
@@ -795,6 +796,7 @@ libs:
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/message_compress_filter.cc
+ - src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/max_age/max_age_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
@@ -1325,6 +1327,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/message_compress_filter.h
+ - src/core/ext/filters/http/message_decompress/message_decompress_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/max_age/max_age_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
@@ -1632,6 +1635,7 @@ libs:
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/message_compress_filter.cc
+ - src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/max_age/max_age_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
diff --git a/config.m4 b/config.m4
index 6b450315f4f..4a252459015 100644
--- a/config.m4
+++ b/config.m4
@@ -104,6 +104,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
+ src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
@@ -841,6 +842,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/client)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/message_compress)
+ PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/message_decompress)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/http/server)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/max_age)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/message_size)
diff --git a/config.w32 b/config.w32
index 59cdbdb1ce4..afdf09c9c39 100644
--- a/config.w32
+++ b/config.w32
@@ -73,6 +73,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\http\\client_authority_filter.cc " +
"src\\core\\ext\\filters\\http\\http_filters_plugin.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\message_compress_filter.cc " +
+ "src\\core\\ext\\filters\\http\\message_decompress\\message_decompress_filter.cc " +
"src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " +
"src\\core\\ext\\filters\\max_age\\max_age_filter.cc " +
"src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +
@@ -842,6 +843,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\client");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\message_compress");
+ FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\message_decompress");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\http\\server");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\max_age");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\message_size");
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 77980999b59..bda48ecdd6d 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -274,6 +274,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/max_age/max_age_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
@@ -725,6 +726,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/max_age/max_age_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 749e77b3fa7..b75f502128d 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -301,6 +301,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/max_age/max_age_filter.cc',
@@ -1078,6 +1080,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/max_age/max_age_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 29e5d3d436b..e902a7c29fc 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -223,6 +223,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc )
s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.h )
+ s.files += %w( src/core/ext/filters/http/message_decompress/message_decompress_filter.cc )
+ s.files += %w( src/core/ext/filters/http/message_decompress/message_decompress_filter.h )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.h )
s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc )
diff --git a/grpc.gyp b/grpc.gyp
index a62c034d65a..458eae5e114 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -497,6 +497,7 @@
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
@@ -992,6 +993,7 @@
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index ff45450f3a3..c1624ad9b7c 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -174,6 +174,14 @@ typedef struct {
/** Enable/disable support for per-message compression. Defaults to 1, unless
GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */
#define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression"
+/** Experimental Arg. Enable/disable support for per-message decompression.
+ Defaults to 1, unless GRPC_ARG_MINIMAL_STACK is enabled, in which case it
+ defaults to 0. If disabled, decompression will be performed lazily by
+ grpc_byte_buffer_reader. This arg also determines whether max message limits
+ will be applied to the decompressed buffer or the non-decompressed buffer. It
+ is recommended to keep this enabled to protect against zip bomb attacks. */
+#define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION \
+ "grpc.per_message_decompression"
/** Enable/disable support for deadline checking. Defaults to 1, unless
GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0 */
#define GRPC_ARG_ENABLE_DEADLINE_CHECKS "grpc.enable_deadline_checking"
diff --git a/package.xml b/package.xml
index 06d1669ec48..5391f709cd3 100644
--- a/package.xml
+++ b/package.xml
@@ -203,6 +203,8 @@
+
+
diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc
index f03fa0141df..59749d54546 100644
--- a/src/core/ext/filters/http/http_filters_plugin.cc
+++ b/src/core/ext/filters/http/http_filters_plugin.cc
@@ -22,6 +22,7 @@
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
+#include "src/core/ext/filters/http/message_decompress/message_decompress_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/call.h"
@@ -36,6 +37,9 @@ typedef struct {
static optional_filter compress_filter = {
&grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION};
+static optional_filter decompress_filter = {
+ &grpc_message_decompress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION};
+
static bool is_building_http_like_transport(
grpc_channel_stack_builder* builder) {
grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
@@ -75,6 +79,15 @@ void grpc_http_filters_init(void) {
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_optional_filter, &compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_optional_filter, &decompress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_optional_filter, &decompress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_optional_filter, &decompress_filter);
grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_required_filter, (void*)&grpc_http_client_filter);
diff --git a/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc b/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
new file mode 100644
index 00000000000..c6a5f8d2847
--- /dev/null
+++ b/src/core/ext/filters/http/message_decompress/message_decompress_filter.cc
@@ -0,0 +1,339 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+#include "src/core/ext/filters/http/message_decompress/message_decompress_filter.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/compression/algorithm_metadata.h"
+#include "src/core/lib/compression/compression_args.h"
+#include "src/core/lib/compression/compression_internal.h"
+#include "src/core/lib/compression/message_compress.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+namespace {
+
+class ChannelData {};
+
+class CallData {
+ public:
+ CallData(const grpc_call_element_args& args)
+ : call_combiner_(args.call_combiner) {
+ // Initialize state for recv_initial_metadata_ready callback
+ GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_,
+ OnRecvInitialMetadataReady, this,
+ grpc_schedule_on_exec_ctx);
+ // Initialize state for recv_message_ready callback
+ grpc_slice_buffer_init(&recv_slices_);
+ GRPC_CLOSURE_INIT(&on_recv_message_next_done_, OnRecvMessageNextDone, this,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&on_recv_message_ready_, OnRecvMessageReady, this,
+ grpc_schedule_on_exec_ctx);
+ // Initialize state for recv_trailing_metadata_ready callback
+ GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_,
+ OnRecvTrailingMetadataReady, this,
+ grpc_schedule_on_exec_ctx);
+ }
+
+ static void DecompressStartTransportStreamOpBatch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
+
+ static void OnRecvInitialMetadataReady(void* arg, grpc_error* error);
+
+ // Methods for processing a receive message event
+ void MaybeResumeOnRecvMessageReady();
+ static void OnRecvMessageReady(void* arg, grpc_error* error);
+ static void OnRecvMessageNextDone(void* arg, grpc_error* error);
+ grpc_error* PullSliceFromRecvMessage();
+ void ContinueReadingRecvMessage();
+ void FinishRecvMessage();
+ void ContinueRecvMessageReadyCallback(grpc_error* error);
+
+ // Methods for processing a recv_trailing_metadata event
+ void MaybeResumeOnRecvTrailingMetadataReady();
+ static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error);
+
+ private:
+ grpc_core::CallCombiner* call_combiner_ = nullptr;
+ // Overall error for the call
+ grpc_error* error_ = GRPC_ERROR_NONE;
+ // Fields for handling recv_initial_metadata_ready callback
+ grpc_closure on_recv_initial_metadata_ready_;
+ grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
+ grpc_metadata_batch* recv_initial_metadata_ = nullptr;
+ // Fields for handling recv_message_ready callback
+ bool seen_recv_message_ready_ = false;
+ grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE;
+ grpc_closure on_recv_message_ready_;
+ grpc_closure* original_recv_message_ready_ = nullptr;
+ grpc_closure on_recv_message_next_done_;
+ grpc_core::OrphanablePtr* recv_message_ = nullptr;
+ // recv_slices_ holds the slices read from the original recv_message stream.
+ // It is initialized during construction and reset when a new stream is
+ // created using it.
+ grpc_slice_buffer recv_slices_;
+ grpc_core::ManualConstructor
+ recv_replacement_stream_;
+ // Fields for handling recv_trailing_metadata_ready callback
+ bool seen_recv_trailing_metadata_ready_ = false;
+ grpc_closure on_recv_trailing_metadata_ready_;
+ grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
+ grpc_error* on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE;
+};
+
+grpc_message_compression_algorithm DecodeMessageCompressionAlgorithm(
+ grpc_mdelem md) {
+ grpc_message_compression_algorithm algorithm =
+ grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+ if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
+ char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid incoming message compression algorithm: '%s'. "
+ "Interpreting incoming data as uncompressed.",
+ md_c_str);
+ gpr_free(md_c_str);
+ return GRPC_MESSAGE_COMPRESS_NONE;
+ }
+ return algorithm;
+}
+
+void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) {
+ CallData* calld = static_cast(arg);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_linked_mdelem* grpc_encoding =
+ calld->recv_initial_metadata_->idx.named.grpc_encoding;
+ if (grpc_encoding != nullptr) {
+ calld->algorithm_ = DecodeMessageCompressionAlgorithm(grpc_encoding->md);
+ grpc_metadata_batch_remove(calld->recv_initial_metadata_,
+ GRPC_BATCH_GRPC_ENCODING);
+ }
+ }
+ calld->MaybeResumeOnRecvMessageReady();
+ calld->MaybeResumeOnRecvTrailingMetadataReady();
+ grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
+ calld->original_recv_initial_metadata_ready_ = nullptr;
+ grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
+}
+
+void CallData::MaybeResumeOnRecvMessageReady() {
+ if (seen_recv_message_ready_) {
+ seen_recv_message_ready_ = false;
+ GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_message_ready_,
+ GRPC_ERROR_NONE,
+ "continue recv_message_ready callback");
+ }
+}
+
+void CallData::OnRecvMessageReady(void* arg, grpc_error* error) {
+ CallData* calld = static_cast(arg);
+ if (error == GRPC_ERROR_NONE &&
+ calld->algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) {
+ // recv_message can be NULL if trailing metadata is received instead of
+ // message.
+ if (*calld->recv_message_ == nullptr ||
+ (*calld->recv_message_)->length() == 0) {
+ calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE);
+ return;
+ }
+ if (calld->original_recv_initial_metadata_ready_ != nullptr) {
+ calld->seen_recv_message_ready_ = true;
+ GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
+ "Deferring OnRecvMessageReady until after "
+ "OnRecvInitialMetadataReady");
+ return;
+ }
+ calld->ContinueReadingRecvMessage();
+ } else {
+ calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error));
+ }
+}
+
+void CallData::ContinueReadingRecvMessage() {
+ while ((*recv_message_)
+ ->Next(~static_cast(0), &on_recv_message_next_done_)) {
+ grpc_error* error = PullSliceFromRecvMessage();
+ if (error != GRPC_ERROR_NONE) {
+ return ContinueRecvMessageReadyCallback(error);
+ }
+ // We have read the entire message.
+ if (recv_slices_.length == (*recv_message_)->length()) {
+ return FinishRecvMessage();
+ }
+ }
+}
+
+grpc_error* CallData::PullSliceFromRecvMessage() {
+ grpc_slice incoming_slice;
+ grpc_error* error = (*recv_message_)->Pull(&incoming_slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&recv_slices_, incoming_slice);
+ }
+ return error;
+}
+
+void CallData::OnRecvMessageNextDone(void* arg, grpc_error* error) {
+ CallData* calld = static_cast(arg);
+ if (error != GRPC_ERROR_NONE) {
+ return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error));
+ }
+ error = calld->PullSliceFromRecvMessage();
+ if (error != GRPC_ERROR_NONE) {
+ return calld->ContinueRecvMessageReadyCallback(error);
+ }
+ if (calld->recv_slices_.length == (*calld->recv_message_)->length()) {
+ calld->FinishRecvMessage();
+ } else {
+ calld->ContinueReadingRecvMessage();
+ }
+}
+
+void CallData::FinishRecvMessage() {
+ grpc_slice_buffer decompressed_slices;
+ if (grpc_msg_decompress(algorithm_, &recv_slices_, &decompressed_slices) ==
+ 0) {
+ gpr_log(
+ GPR_ERROR,
+ "Unexpected error decompressing data for algorithm with enum value %d",
+ algorithm_);
+ }
+ uint32_t recv_flags = (*recv_message_)->flags();
+ // Swap out the original receive byte stream with our new one and send the
+ // batch down.
+ recv_replacement_stream_.Init(&recv_slices_, recv_flags);
+ recv_message_->reset(recv_replacement_stream_.get());
+ recv_message_ = nullptr;
+ ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE);
+}
+
+void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) {
+ MaybeResumeOnRecvTrailingMetadataReady();
+ // The surface will clean up the receiving stream if there is an error.
+ grpc_closure* closure = original_recv_message_ready_;
+ original_recv_message_ready_ = nullptr;
+ grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
+}
+
+void CallData::MaybeResumeOnRecvTrailingMetadataReady() {
+ if (seen_recv_trailing_metadata_ready_) {
+ seen_recv_trailing_metadata_ready_ = false;
+ grpc_error* error = on_recv_trailing_metadata_ready_error_;
+ on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE;
+ GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_trailing_metadata_ready_,
+ error, "Continuing OnRecvTrailingMetadataReady");
+ }
+}
+
+void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) {
+ CallData* calld = static_cast(arg);
+ if (calld->original_recv_initial_metadata_ready_ != nullptr ||
+ calld->original_recv_message_ready_ != nullptr) {
+ calld->seen_recv_trailing_metadata_ready_ = true;
+ calld->on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_REF(error);
+ GRPC_CALL_COMBINER_STOP(
+ calld->call_combiner_,
+ "Deferring OnRecvTrailingMetadataReady until after "
+ "OnRecvInitialMetadataReady and OnRecvMessageReady");
+ return;
+ }
+ error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error_);
+ calld->error_ = GRPC_ERROR_NONE;
+ grpc_closure* closure = calld->original_recv_trailing_metadata_ready_;
+ calld->original_recv_trailing_metadata_ready_ = nullptr;
+ grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
+}
+
+void CallData::DecompressStartTransportStreamOpBatch(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ GPR_TIMER_SCOPE("compress_start_transport_stream_op_batch", 0);
+ CallData* calld = static_cast(elem->call_data);
+ // Handle recv_initial_metadata.
+ if (batch->recv_initial_metadata) {
+ calld->recv_initial_metadata_ =
+ batch->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->original_recv_initial_metadata_ready_ =
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->on_recv_initial_metadata_ready_;
+ }
+ // Handle recv_message
+ if (batch->recv_message) {
+ calld->recv_message_ = batch->payload->recv_message.recv_message;
+ calld->original_recv_message_ready_ =
+ batch->payload->recv_message.recv_message_ready;
+ batch->payload->recv_message.recv_message_ready =
+ &calld->on_recv_message_ready_;
+ }
+ // Handle recv_trailing_metadata
+ if (batch->recv_trailing_metadata) {
+ calld->original_recv_trailing_metadata_ready_ =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->on_recv_trailing_metadata_ready_;
+ }
+ // Pass control down the stack.
+ grpc_call_next_op(elem, batch);
+}
+
+static grpc_error* DecompressInitCallElem(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ new (elem->call_data) CallData(*args);
+ return GRPC_ERROR_NONE;
+}
+
+static void DecompressDestroyCallElem(
+ grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
+ grpc_closure* /*ignored*/) {
+ CallData* calld = static_cast(elem->call_data);
+ calld->~CallData();
+}
+
+static grpc_error* DecompressInitChannelElem(
+ grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) {
+ return GRPC_ERROR_NONE;
+}
+
+void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) { return; }
+
+} // namespace
+
+const grpc_channel_filter grpc_message_decompress_filter = {
+ CallData::DecompressStartTransportStreamOpBatch,
+ grpc_channel_next_op,
+ sizeof(CallData),
+ DecompressInitCallElem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ DecompressDestroyCallElem,
+ 0, // sizeof(ChannelData)
+ DecompressInitChannelElem,
+ DecompressDestroyChannelElem,
+ grpc_channel_next_get_info,
+ "message_decompress"};
diff --git a/src/core/ext/filters/http/message_decompress/message_decompress_filter.h b/src/core/ext/filters/http/message_decompress/message_decompress_filter.h
new file mode 100644
index 00000000000..5e6b01d5cbb
--- /dev/null
+++ b/src/core/ext/filters/http/message_decompress/message_decompress_filter.h
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H
+
+#include
+
+#include "src/core/lib/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_message_decompress_filter;
+
+#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_DECOMPRESS_MESSAGE_DECOMPRESS_FILTER_H \
+ */
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index fcebe9bc410..339b2438c03 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -200,6 +200,8 @@ struct grpc_call {
/* Stream compression algorithm for *incoming* data */
grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
GRPC_STREAM_COMPRESS_NONE;
+ /* Maximum size for uncompressed receive message in bytes. -1 for unlimited */
+ int max_uncompressed_receive_message_length = -1;
/* Supported encodings (compression algorithms), a bitset.
* Always support no compression. */
uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
@@ -497,6 +499,11 @@ void grpc_call_set_completion_queue(grpc_call* call,
&call->pollent);
}
+void grpc_call_set_max_uncompressed_receive_message_length(grpc_call* call,
+ int limit) {
+ call->max_uncompressed_receive_message_length = limit;
+}
+
#ifndef NDEBUG
#define REF_REASON reason
#define REF_ARG , const char* reason
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index a33664af6a9..351e57d75cb 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -59,6 +59,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq);
+/* Sets the max uncompressed receive message length for the call. */
+void grpc_call_set_max_uncompressed_receive_message_length(grpc_call* call,
+ int limit);
+
#ifndef NDEBUG
void grpc_call_internal_ref(grpc_call* call, const char* reason);
void grpc_call_internal_unref(grpc_call* call, const char* reason);
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 6c2dfed4ae3..607eb74c354 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -82,6 +82,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
+ 'src/core/ext/filters/http/message_decompress/message_decompress_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 60e99d7b5a0..3e9f42468a0 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1186,6 +1186,8 @@ src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.h \
+src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \
+src/core/ext/filters/http/message_decompress/message_decompress_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/max_age/max_age_filter.cc \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index fef1fd11d4a..8b6f9a7cb3c 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -986,6 +986,8 @@ src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.h \
+src/core/ext/filters/http/message_decompress/message_decompress_filter.cc \
+src/core/ext/filters/http/message_decompress/message_decompress_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/max_age/max_age_filter.cc \