From 0497ad8bb0c02995f16a76b2ce85753a026f08bc Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 10 Jun 2020 15:25:40 -0700 Subject: [PATCH] Add missing message-size check before decompressing Add and fix tests for limit check before decompression Code restructuring to allow easy reuse of service config call data Regenerate projects Reviewer comments --- BUILD | 2 + BUILD.gn | 1 + CMakeLists.txt | 2 + Makefile | 2 + build_autogenerated.yaml | 2 + config.m4 | 1 + config.w32 | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + grpc.gyp | 2 + package.xml | 1 + .../service_config_channel_arg_filter.cc | 142 ++++++++ .../ext/filters/http/http_filters_plugin.cc | 3 +- .../message_decompress_filter.cc | 92 +++-- .../message_decompress_filter.h | 4 +- .../message_size/message_size_filter.cc | 106 +++--- .../message_size/message_size_filter.h | 6 + .../plugin_registry/grpc_plugin_registry.cc | 4 + .../grpc_unsecure_plugin_registry.cc | 4 + src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/end2end/tests/max_message_length.cc | 332 ++++++++++++++++++ test/cpp/microbenchmarks/bm_call_create.cc | 3 +- tools/doxygen/Doxyfile.c++.internal | 1 + tools/doxygen/Doxyfile.core.internal | 1 + 24 files changed, 622 insertions(+), 93 deletions(-) create mode 100644 src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc diff --git a/BUILD b/BUILD index 0d2a7ca058c..bbb1d8dfeea 100644 --- a/BUILD +++ b/BUILD @@ -1044,6 +1044,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/retry_throttle.cc", "src/core/ext/filters/client_channel/server_address.cc", "src/core/ext/filters/client_channel/service_config.cc", + "src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc", "src/core/ext/filters/client_channel/service_config_parser.cc", "src/core/ext/filters/client_channel/subchannel.cc", "src/core/ext/filters/client_channel/subchannel_pool_interface.cc", @@ -1184,6 +1185,7 @@ grpc_cc_library( language = "c++", deps = [ "grpc_base", + "grpc_message_size_filter", ], ) diff --git a/BUILD.gn b/BUILD.gn index a16d298f1be..5e5f8622b34 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -293,6 +293,7 @@ config("grpc_config") { "src/core/ext/filters/client_channel/service_config.cc", "src/core/ext/filters/client_channel/service_config.h", "src/core/ext/filters/client_channel/service_config_call_data.h", + "src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc", "src/core/ext/filters/client_channel/service_config_parser.cc", "src/core/ext/filters/client_channel/service_config_parser.h", "src/core/ext/filters/client_channel/subchannel.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 323f7800627..730dc9b0775 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1375,6 +1375,7 @@ add_library(grpc src/core/ext/filters/client_channel/retry_throttle.cc src/core/ext/filters/client_channel/server_address.cc src/core/ext/filters/client_channel/service_config.cc + src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc src/core/ext/filters/client_channel/service_config_parser.cc src/core/ext/filters/client_channel/subchannel.cc src/core/ext/filters/client_channel/subchannel_pool_interface.cc @@ -2046,6 +2047,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/retry_throttle.cc src/core/ext/filters/client_channel/server_address.cc src/core/ext/filters/client_channel/service_config.cc + src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc src/core/ext/filters/client_channel/service_config_parser.cc src/core/ext/filters/client_channel/subchannel.cc src/core/ext/filters/client_channel/subchannel_pool_interface.cc diff --git a/Makefile b/Makefile index bbaccc44513..2de807dead0 100644 --- a/Makefile +++ b/Makefile @@ -3677,6 +3677,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/server_address.cc \ src/core/ext/filters/client_channel/service_config.cc \ + src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \ src/core/ext/filters/client_channel/service_config_parser.cc \ src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ @@ -4322,6 +4323,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/server_address.cc \ src/core/ext/filters/client_channel/service_config.cc \ + src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \ src/core/ext/filters/client_channel/service_config_parser.cc \ src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 9eb53b8e576..57ee5eabde8 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -793,6 +793,7 @@ libs: - src/core/ext/filters/client_channel/retry_throttle.cc - src/core/ext/filters/client_channel/server_address.cc - src/core/ext/filters/client_channel/service_config.cc + - src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc - src/core/ext/filters/client_channel/service_config_parser.cc - src/core/ext/filters/client_channel/subchannel.cc - src/core/ext/filters/client_channel/subchannel_pool_interface.cc @@ -1652,6 +1653,7 @@ libs: - src/core/ext/filters/client_channel/retry_throttle.cc - src/core/ext/filters/client_channel/server_address.cc - src/core/ext/filters/client_channel/service_config.cc + - src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc - src/core/ext/filters/client_channel/service_config_parser.cc - src/core/ext/filters/client_channel/subchannel.cc - src/core/ext/filters/client_channel/subchannel_pool_interface.cc diff --git a/config.m4 b/config.m4 index cd1019abcd9..4f0fd8513d9 100644 --- a/config.m4 +++ b/config.m4 @@ -92,6 +92,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/retry_throttle.cc \ src/core/ext/filters/client_channel/server_address.cc \ src/core/ext/filters/client_channel/service_config.cc \ + src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \ src/core/ext/filters/client_channel/service_config_parser.cc \ src/core/ext/filters/client_channel/subchannel.cc \ src/core/ext/filters/client_channel/subchannel_pool_interface.cc \ diff --git a/config.w32 b/config.w32 index 8109f9103af..093add492ad 100644 --- a/config.w32 +++ b/config.w32 @@ -61,6 +61,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\retry_throttle.cc " + "src\\core\\ext\\filters\\client_channel\\server_address.cc " + "src\\core\\ext\\filters\\client_channel\\service_config.cc " + + "src\\core\\ext\\filters\\client_channel\\service_config_channel_arg_filter.cc " + "src\\core\\ext\\filters\\client_channel\\service_config_parser.cc " + "src\\core\\ext\\filters\\client_channel\\subchannel.cc " + "src\\core\\ext\\filters\\client_channel\\subchannel_pool_interface.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 695768422c2..e4ae503d0a2 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -277,6 +277,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/service_config.cc', 'src/core/ext/filters/client_channel/service_config.h', 'src/core/ext/filters/client_channel/service_config_call_data.h', + 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc', 'src/core/ext/filters/client_channel/service_config_parser.cc', 'src/core/ext/filters/client_channel/service_config_parser.h', 'src/core/ext/filters/client_channel/subchannel.cc', diff --git a/grpc.gemspec b/grpc.gemspec index af04b9d9de4..66abcc3f64f 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -199,6 +199,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/service_config.cc ) s.files += %w( src/core/ext/filters/client_channel/service_config.h ) s.files += %w( src/core/ext/filters/client_channel/service_config_call_data.h ) + s.files += %w( src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc ) s.files += %w( src/core/ext/filters/client_channel/service_config_parser.cc ) s.files += %w( src/core/ext/filters/client_channel/service_config_parser.h ) s.files += %w( src/core/ext/filters/client_channel/subchannel.cc ) diff --git a/grpc.gyp b/grpc.gyp index b80efd83534..bc1a11e3cca 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -487,6 +487,7 @@ 'src/core/ext/filters/client_channel/retry_throttle.cc', 'src/core/ext/filters/client_channel/server_address.cc', 'src/core/ext/filters/client_channel/service_config.cc', + 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc', 'src/core/ext/filters/client_channel/service_config_parser.cc', 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_pool_interface.cc', @@ -994,6 +995,7 @@ 'src/core/ext/filters/client_channel/retry_throttle.cc', 'src/core/ext/filters/client_channel/server_address.cc', 'src/core/ext/filters/client_channel/service_config.cc', + 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc', 'src/core/ext/filters/client_channel/service_config_parser.cc', 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_pool_interface.cc', diff --git a/package.xml b/package.xml index 0166a778789..93b22c15c0b 100644 --- a/package.xml +++ b/package.xml @@ -179,6 +179,7 @@ + diff --git a/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc b/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc new file mode 100644 index 00000000000..9aad7c71307 --- /dev/null +++ b/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc @@ -0,0 +1,142 @@ +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// This filter reads GRPC_ARG_SERVICE_CONFIG and populates ServiceConfigCallData +// in the call context per call for direct channels. + +#include + +#include "src/core/ext/filters/client_channel/service_config_call_data.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/surface/channel_init.h" + +namespace grpc_core { + +namespace { + +class ServiceConfigChannelArgCallData {}; + +class ServiceConfigChannelArgChannelData { + public: + explicit ServiceConfigChannelArgChannelData( + const grpc_channel_element_args* args) { + const char* service_config_str = grpc_channel_arg_find_string( + args->channel_args, GRPC_ARG_SERVICE_CONFIG); + if (service_config_str != nullptr) { + grpc_error* service_config_error = GRPC_ERROR_NONE; + auto svc_cfg = grpc_core::ServiceConfig::Create(service_config_str, + &service_config_error); + if (service_config_error == GRPC_ERROR_NONE) { + svc_cfg_ = std::move(svc_cfg); + } else { + gpr_log(GPR_ERROR, "%s", grpc_error_string(service_config_error)); + } + GRPC_ERROR_UNREF(service_config_error); + } + } + + grpc_core::RefCountedPtr svc_cfg() const { + return svc_cfg_; + } + + private: + grpc_core::RefCountedPtr svc_cfg_; +}; + +grpc_error* ServiceConfigChannelArgInitCallElem( + grpc_call_element* elem, const grpc_call_element_args* args) { + ServiceConfigChannelArgChannelData* chand = + static_cast(elem->channel_data); + if (chand->svc_cfg() != nullptr) { + GPR_DEBUG_ASSERT(args->context != nullptr); + args->arena->New( + chand->svc_cfg(), + chand->svc_cfg()->GetMethodParsedConfigVector(args->path), + args->context); + } + return GRPC_ERROR_NONE; +} + +void ServiceConfigChannelArgDestroyCallElem( + grpc_call_element* /* elem */, const grpc_call_final_info* /* final_info */, + grpc_closure* /* then_schedule_closure */) {} + +grpc_error* ServiceConfigChannelArgInitChannelElem( + grpc_channel_element* elem, grpc_channel_element_args* args) { + ServiceConfigChannelArgChannelData* chand = + static_cast(elem->channel_data); + new (chand) ServiceConfigChannelArgChannelData(args); + const char* service_config_str = grpc_channel_arg_get_string( + grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG)); + if (service_config_str != nullptr) { + grpc_error* service_config_error = GRPC_ERROR_NONE; + auto svc_cfg = grpc_core::ServiceConfig::Create(service_config_str, + &service_config_error); + if (service_config_error == GRPC_ERROR_NONE) { + chand->svc_cfg = std::move(svc_cfg); + } else { + gpr_log(GPR_ERROR, "%s", grpc_error_string(service_config_error)); + } + GRPC_ERROR_UNREF(service_config_error); + } + return GRPC_ERROR_NONE; +} + +void ServiceConfigChannelArgDestroyChannelElem(grpc_channel_element* elem) { + ServiceConfigChannelArgChannelData* chand = + static_cast(elem->channel_data); + chand->~ServiceConfigChannelArgChannelData(); +} + +const grpc_channel_filter ServiceConfigChannelArgFilter = { + grpc_call_next_op, + grpc_channel_next_op, + sizeof(ServiceConfigChannelArgCallData), + ServiceConfigChannelArgInitCallElem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + ServiceConfigChannelArgDestroyCallElem, + sizeof(ServiceConfigChannelArgChannelData), + ServiceConfigChannelArgInitChannelElem, + ServiceConfigChannelArgDestroyChannelElem, + grpc_channel_next_get_info, + "service_config_channel_arg"}; + +bool maybe_add_service_config_channel_arg_filter( + grpc_channel_stack_builder* builder, void* /* arg */) { + const grpc_channel_args* channel_args = + grpc_channel_stack_builder_get_channel_arguments(builder); + if (grpc_channel_args_want_minimal_stack(channel_args) || + grpc_channel_arg_get_string(grpc_channel_args_find( + channel_args, GRPC_ARG_SERVICE_CONFIG)) == nullptr) { + return true; + } + return grpc_channel_stack_builder_prepend_filter( + builder, &ServiceConfigChannelArgFilter, nullptr, nullptr); +} + +} // namespace + +} // namespace grpc_core + +void grpc_service_config_channel_arg_filter_init(void) { + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + grpc_core::maybe_add_service_config_channel_arg_filter, nullptr); +} + +void grpc_service_config_channel_arg_filter_shutdown(void) {} diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index 2fedc7fe3d7..637dc3030f2 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -38,7 +38,8 @@ static optional_filter compress_filter = { &grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION}; static optional_filter decompress_filter = { - &grpc_message_decompress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION}; + &grpc_core::MessageDecompressFilter, + GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION}; static bool is_building_http_like_transport( grpc_channel_stack_builder* builder) { diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc index d12f4013bb2..122c74e1e98 100644 --- a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc @@ -18,6 +18,8 @@ #include +#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h" + #include #include @@ -27,7 +29,8 @@ #include #include -#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h" +#include "absl/strings/str_format.h" +#include "src/core/ext/filters/message_size/message_size_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/compression/algorithm_metadata.h" #include "src/core/lib/compression/compression_args.h" @@ -37,14 +40,25 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +namespace grpc_core { namespace { -class ChannelData {}; +class ChannelData { + public: + explicit ChannelData(const grpc_channel_element_args* args) + : max_recv_size_(get_max_recv_size(args->channel_args)) {} + + int max_recv_size() const { return max_recv_size_; } + + private: + int max_recv_size_; +}; class CallData { public: - explicit CallData(const grpc_call_element_args& args) - : call_combiner_(args.call_combiner) { + CallData(const grpc_call_element_args& args, const ChannelData* chand) + : call_combiner_(args.call_combiner), + max_recv_message_length_(chand->max_recv_size()) { // Initialize state for recv_initial_metadata_ready callback GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_, OnRecvInitialMetadataReady, this, @@ -59,6 +73,13 @@ class CallData { GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_, OnRecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx); + const MessageSizeParsedConfig* limits = + get_message_size_config_from_call_context(args.context); + if (limits != nullptr && limits->limits().max_recv_size >= 0 && + (limits->limits().max_recv_size < max_recv_message_length_ || + max_recv_message_length_ < 0)) { + max_recv_message_length_ = limits->limits().max_recv_size; + } } ~CallData() { grpc_slice_buffer_destroy_internal(&recv_slices_); } @@ -82,7 +103,7 @@ class CallData { void MaybeResumeOnRecvTrailingMetadataReady(); static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error); - grpc_core::CallCombiner* call_combiner_; + CallCombiner* call_combiner_; // Overall error for the call grpc_error* error_ = GRPC_ERROR_NONE; // Fields for handling recv_initial_metadata_ready callback @@ -91,17 +112,18 @@ class CallData { grpc_metadata_batch* recv_initial_metadata_ = nullptr; // Fields for handling recv_message_ready callback bool seen_recv_message_ready_ = false; + int max_recv_message_length_; grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE; grpc_closure on_recv_message_ready_; grpc_closure* original_recv_message_ready_ = nullptr; grpc_closure on_recv_message_next_done_; - grpc_core::OrphanablePtr* recv_message_ = nullptr; + OrphanablePtr* recv_message_ = nullptr; // recv_slices_ holds the slices read from the original recv_message stream. // It is initialized during construction and reset when a new stream is // created using it. grpc_slice_buffer recv_slices_; - std::aligned_storage::type + std::aligned_storage::type recv_replacement_stream_; // Fields for handling recv_trailing_metadata_ready callback bool seen_recv_trailing_metadata_ready_ = false; @@ -139,7 +161,7 @@ void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) { calld->MaybeResumeOnRecvTrailingMetadataReady(); grpc_closure* closure = calld->original_recv_initial_metadata_ready_; calld->original_recv_initial_metadata_ready_ = nullptr; - grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error)); + Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error)); } void CallData::MaybeResumeOnRecvMessageReady() { @@ -170,6 +192,19 @@ void CallData::OnRecvMessageReady(void* arg, grpc_error* error) { 0) { return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE); } + if (calld->max_recv_message_length_ >= 0 && + (*calld->recv_message_)->length() > + static_cast(calld->max_recv_message_length_)) { + std::string message_string = absl::StrFormat( + "Received message larger than max (%u vs. %d)", + (*calld->recv_message_)->length(), calld->max_recv_message_length_); + GPR_DEBUG_ASSERT(calld->error_ == GRPC_ERROR_NONE); + calld->error_ = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string.c_str()), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); + return calld->ContinueRecvMessageReadyCallback( + GRPC_ERROR_REF(calld->error_)); + } grpc_slice_buffer_destroy_internal(&calld->recv_slices_); grpc_slice_buffer_init(&calld->recv_slices_); return calld->ContinueReadingRecvMessage(); @@ -241,9 +276,9 @@ void CallData::FinishRecvMessage() { // Initializing recv_replacement_stream_ with decompressed_slices removes // all the slices from decompressed_slices leaving it empty. new (&recv_replacement_stream_) - grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags); - recv_message_->reset(reinterpret_cast( - &recv_replacement_stream_)); + SliceBufferByteStream(&decompressed_slices, recv_flags); + recv_message_->reset( + reinterpret_cast(&recv_replacement_stream_)); recv_message_ = nullptr; } ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_)); @@ -254,7 +289,7 @@ void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) { // The surface will clean up the receiving stream if there is an error. grpc_closure* closure = original_recv_message_ready_; original_recv_message_ready_ = nullptr; - grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); + Closure::Run(DEBUG_LOCATION, closure, error); } void CallData::MaybeResumeOnRecvTrailingMetadataReady() { @@ -283,7 +318,7 @@ void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) { calld->error_ = GRPC_ERROR_NONE; grpc_closure* closure = calld->original_recv_trailing_metadata_ready_; calld->original_recv_trailing_metadata_ready_ = nullptr; - grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); + Closure::Run(DEBUG_LOCATION, closure, error); } void CallData::DecompressStartTransportStreamOpBatch( @@ -322,37 +357,44 @@ void DecompressStartTransportStreamOpBatch( calld->DecompressStartTransportStreamOpBatch(elem, batch); } -static grpc_error* DecompressInitCallElem(grpc_call_element* elem, - const grpc_call_element_args* args) { - new (elem->call_data) CallData(*args); +grpc_error* DecompressInitCallElem(grpc_call_element* elem, + const grpc_call_element_args* args) { + ChannelData* chand = static_cast(elem->channel_data); + new (elem->call_data) CallData(*args, chand); return GRPC_ERROR_NONE; } -static void DecompressDestroyCallElem( - grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { +void DecompressDestroyCallElem(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { CallData* calld = static_cast(elem->call_data); calld->~CallData(); } -static grpc_error* DecompressInitChannelElem( - grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) { +grpc_error* DecompressInitChannelElem(grpc_channel_element* elem, + grpc_channel_element_args* args) { + ChannelData* chand = static_cast(elem->channel_data); + new (chand) ChannelData(args); return GRPC_ERROR_NONE; } -void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) {} +void DecompressDestroyChannelElem(grpc_channel_element* elem) { + ChannelData* chand = static_cast(elem->channel_data); + chand->~ChannelData(); +} } // namespace -const grpc_channel_filter grpc_message_decompress_filter = { +const grpc_channel_filter MessageDecompressFilter = { DecompressStartTransportStreamOpBatch, grpc_channel_next_op, sizeof(CallData), DecompressInitCallElem, grpc_call_stack_ignore_set_pollset_or_pollset_set, DecompressDestroyCallElem, - 0, // sizeof(ChannelData) + sizeof(ChannelData), DecompressInitChannelElem, DecompressDestroyChannelElem, grpc_channel_next_get_info, "message_decompress"}; +} // namespace grpc_core diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.h b/src/core/ext/filters/http/message_compress/message_decompress_filter.h index 7d567bf08a2..f19a4ca0cbd 100644 --- a/src/core/ext/filters/http/message_compress/message_decompress_filter.h +++ b/src/core/ext/filters/http/message_compress/message_decompress_filter.h @@ -23,7 +23,9 @@ #include "src/core/lib/channel/channel_stack.h" -extern const grpc_channel_filter grpc_message_decompress_filter; +namespace grpc_core { +extern const grpc_channel_filter MessageDecompressFilter; +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \ */ diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index d2ef5477636..53b4da451f5 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -45,6 +45,25 @@ namespace { size_t g_message_size_parser_index; } // namespace +// +// MessageSizeParsedConfig +// + +const MessageSizeParsedConfig* MessageSizeParsedConfig::GetFromCallContext( + const grpc_call_context_element* context) { + if (context == nullptr) return nullptr; + auto* svc_cfg_call_data = static_cast( + context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); + if (svc_cfg_call_data == nullptr) return nullptr; + return static_cast( + svc_cfg_call_data->GetMethodParsedConfig( + MessageSizeParser::ParserIndex())); +} + +// +// MessageSizeParser +// + std::unique_ptr MessageSizeParser::ParsePerMethodParams(const Json& json, grpc_error** error) { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); @@ -97,12 +116,26 @@ void MessageSizeParser::Register() { } size_t MessageSizeParser::ParserIndex() { return g_message_size_parser_index; } + +int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args) { + if (grpc_channel_args_want_minimal_stack(args)) return -1; + return grpc_channel_args_find_integer( + args, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, + {GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX}); +} + +int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args) { + if (grpc_channel_args_want_minimal_stack(args)) return -1; + return grpc_channel_args_find_integer( + args, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, + {GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX}); +} + } // namespace grpc_core namespace { struct channel_data { grpc_core::MessageSizeParsedConfig::message_size_limits limits; - grpc_core::RefCountedPtr svc_cfg; }; struct call_data { @@ -118,24 +151,8 @@ struct call_data { // Note: Per-method config is only available on the client, so we // apply the max request size to the send limit and the max response // size to the receive limit. - const grpc_core::MessageSizeParsedConfig* limits = nullptr; - grpc_core::ServiceConfigCallData* svc_cfg_call_data = nullptr; - if (args.context != nullptr) { - svc_cfg_call_data = static_cast( - args.context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); - } - if (svc_cfg_call_data != nullptr) { - limits = static_cast( - svc_cfg_call_data->GetMethodParsedConfig( - grpc_core::MessageSizeParser::ParserIndex())); - } else if (chand.svc_cfg != nullptr) { - const auto* objs_vector = - chand.svc_cfg->GetMethodParsedConfigVector(args.path); - if (objs_vector != nullptr) { - limits = static_cast( - (*objs_vector)[grpc_core::MessageSizeParser::ParserIndex()].get()); - } - } + const grpc_core::MessageSizeParsedConfig* limits = + grpc_core::get_message_size_config_from_call_context(args.context); if (limits != nullptr) { if (limits->limits().max_send_size >= 0 && (limits->limits().max_send_size < this->limits.max_send_size || @@ -288,35 +305,11 @@ static void message_size_destroy_call_elem( calld->~call_data(); } -static int default_size(const grpc_channel_args* args, - int without_minimal_stack) { - if (grpc_channel_args_want_minimal_stack(args)) { - return -1; - } - return without_minimal_stack; -} - grpc_core::MessageSizeParsedConfig::message_size_limits get_message_size_limits( const grpc_channel_args* channel_args) { grpc_core::MessageSizeParsedConfig::message_size_limits lim; - lim.max_send_size = - default_size(channel_args, GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH); - lim.max_recv_size = - default_size(channel_args, GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH); - for (size_t i = 0; i < channel_args->num_args; ++i) { - if (strcmp(channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == - 0) { - const grpc_integer_options options = {lim.max_send_size, -1, INT_MAX}; - lim.max_send_size = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - } - if (strcmp(channel_args->args[i].key, - GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { - const grpc_integer_options options = {lim.max_recv_size, -1, INT_MAX}; - lim.max_recv_size = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - } - } + lim.max_send_size = grpc_core::get_max_send_size(channel_args); + lim.max_recv_size = grpc_core::get_max_recv_size(channel_args); return lim; } @@ -327,26 +320,6 @@ static grpc_error* message_size_init_channel_elem( channel_data* chand = static_cast(elem->channel_data); new (chand) channel_data(); chand->limits = get_message_size_limits(args->channel_args); - // TODO(yashykt): We only need to read GRPC_ARG_SERVICE_CONFIG in the case of - // direct channels. (Service config is otherwise stored in the call_context by - // client_channel filter.) If we ever need a second filter that also needs to - // parse GRPC_ARG_SERVICE_CONFIG, we should refactor this code and add a - // separate filter that reads GRPC_ARG_SERVICE_CONFIG and saves the parsed - // config in the call_context. - const grpc_arg* channel_arg = - grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); - const char* service_config_str = grpc_channel_arg_get_string(channel_arg); - if (service_config_str != nullptr) { - grpc_error* service_config_error = GRPC_ERROR_NONE; - auto svc_cfg = grpc_core::ServiceConfig::Create(service_config_str, - &service_config_error); - if (service_config_error == GRPC_ERROR_NONE) { - chand->svc_cfg = std::move(svc_cfg); - } else { - gpr_log(GPR_ERROR, "%s", grpc_error_string(service_config_error)); - } - GRPC_ERROR_UNREF(service_config_error); - } return GRPC_ERROR_NONE; } @@ -387,6 +360,9 @@ static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder, void* /*arg*/) { const grpc_channel_args* channel_args = grpc_channel_stack_builder_get_channel_arguments(builder); + if (grpc_channel_args_want_minimal_stack(channel_args)) { + return true; + } bool enable = false; grpc_core::MessageSizeParsedConfig::message_size_limits lim = get_message_size_limits(channel_args); diff --git a/src/core/ext/filters/message_size/message_size_filter.h b/src/core/ext/filters/message_size/message_size_filter.h index 132d7b2af0f..ea0dd0266d0 100644 --- a/src/core/ext/filters/message_size/message_size_filter.h +++ b/src/core/ext/filters/message_size/message_size_filter.h @@ -40,6 +40,9 @@ class MessageSizeParsedConfig : public ServiceConfigParser::ParsedConfig { const message_size_limits& limits() const { return limits_; } + static const MessageSizeParsedConfig* GetFromCallContext( + const grpc_call_context_element* context); + private: message_size_limits limits_; }; @@ -54,6 +57,9 @@ class MessageSizeParser : public ServiceConfigParser::Parser { static size_t ParserIndex(); }; +int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args); +int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args); + } // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H */ diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index f1c3cbf4036..ef8b10df647 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -64,6 +64,8 @@ void grpc_max_age_filter_init(void); void grpc_max_age_filter_shutdown(void); void grpc_message_size_filter_init(void); void grpc_message_size_filter_shutdown(void); +void grpc_service_config_channel_arg_filter_init(void); +void grpc_service_config_channel_arg_filter_shutdown(void); void grpc_client_authority_filter_init(void); void grpc_client_authority_filter_shutdown(void); void grpc_workaround_cronet_compression_filter_init(void); @@ -114,6 +116,8 @@ void grpc_register_built_in_plugins(void) { grpc_max_age_filter_shutdown); grpc_register_plugin(grpc_message_size_filter_init, grpc_message_size_filter_shutdown); + grpc_register_plugin(grpc_service_config_channel_arg_filter_init, + grpc_service_config_channel_arg_filter_shutdown); grpc_register_plugin(grpc_client_authority_filter_init, grpc_client_authority_filter_shutdown); grpc_register_plugin(grpc_workaround_cronet_compression_filter_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc index de53f173294..525fa108d81 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc @@ -64,6 +64,8 @@ void grpc_max_age_filter_init(void); void grpc_max_age_filter_shutdown(void); void grpc_message_size_filter_init(void); void grpc_message_size_filter_shutdown(void); +void grpc_service_config_channel_arg_filter_init(void); +void grpc_service_config_channel_arg_filter_shutdown(void); void grpc_client_authority_filter_init(void); void grpc_client_authority_filter_shutdown(void); void grpc_workaround_cronet_compression_filter_init(void); @@ -114,6 +116,8 @@ void grpc_register_built_in_plugins(void) { grpc_max_age_filter_shutdown); grpc_register_plugin(grpc_message_size_filter_init, grpc_message_size_filter_shutdown); + grpc_register_plugin(grpc_service_config_channel_arg_filter_init, + grpc_service_config_channel_arg_filter_shutdown); grpc_register_plugin(grpc_client_authority_filter_init, grpc_client_authority_filter_shutdown); grpc_register_plugin(grpc_workaround_cronet_compression_filter_init, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 37bed16955f..7afe714e078 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -70,6 +70,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/retry_throttle.cc', 'src/core/ext/filters/client_channel/server_address.cc', 'src/core/ext/filters/client_channel/service_config.cc', + 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc', 'src/core/ext/filters/client_channel/service_config_parser.cc', 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_pool_interface.cc', diff --git a/test/core/end2end/tests/max_message_length.cc b/test/core/end2end/tests/max_message_length.cc index 40e752a3d63..256cc982940 100644 --- a/test/core/end2end/tests/max_message_length.cc +++ b/test/core/end2end/tests/max_message_length.cc @@ -29,6 +29,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/metadata.h" #include "test/core/end2end/cq_verifier.h" @@ -466,6 +467,328 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config, grpc_byte_buffer_destroy(response_payload); grpc_byte_buffer_destroy(recv_payload); + grpc_call_unref(c); + if (s != nullptr) grpc_call_unref(s); + cq_verifier_destroy(cqv); + end_test(&f); + config.tear_down_data(&f); +} + +static grpc_metadata gzip_compression_override() { + grpc_metadata gzip_compression_override; + gzip_compression_override.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; + gzip_compression_override.value = grpc_slice_from_static_string("gzip"); + memset(&gzip_compression_override.internal_data, 0, + sizeof(gzip_compression_override.internal_data)); + return gzip_compression_override; +} + +// Test receive message limit with compressed request larger than the limit +static void test_max_receive_message_length_on_compressed_request( + grpc_end2end_test_config config, bool minimal_stack) { + gpr_log(GPR_INFO, + "test max receive message length on compressed request with " + "minimal_stack=%d", + minimal_stack); + grpc_end2end_test_fixture f; + grpc_call* c = nullptr; + grpc_call* s = nullptr; + cq_verifier* cqv; + grpc_op ops[6]; + grpc_op* op; + grpc_slice request_payload_slice = grpc_slice_malloc(1024); + memset(GRPC_SLICE_START_PTR(request_payload_slice), 'a', 1024); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_byte_buffer* recv_payload = nullptr; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details, status_details; + int was_cancelled = 2; + + // Set limit via channel args. + grpc_arg arg[2]; + arg[0] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5); + arg[1] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_MINIMAL_STACK), minimal_stack); + grpc_channel_args* server_args = + grpc_channel_args_copy_and_add(nullptr, arg, 2); + + f = begin_test(config, "test_max_request_message_length", nullptr, + server_args); + { + grpc_core::ExecCtx exec_ctx; + grpc_channel_args_destroy(server_args); + } + cqv = cq_verifier_create(f.cq); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + grpc_metadata compression_md = gzip_compression_override(); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 1; + op->data.send_initial_metadata.metadata = &compression_md; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + if (minimal_stack) { + /* Expect the RPC to proceed normally for a minimal stack */ + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; + op->flags = 0; + op->reserved = nullptr; + op++; + } + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + if (minimal_stack) { + /* We do not perform message size checks for minimal stack. */ + GPR_ASSERT(status == GRPC_STATUS_OK); + } else { + GPR_ASSERT(was_cancelled == 1); + GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); + GPR_ASSERT(grpc_slice_str_cmp( + details, "Received message larger than max (29 vs. 5)") == + 0); + } + grpc_slice_unref(details); + grpc_slice_unref(request_payload_slice); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(recv_payload); + grpc_call_unref(c); + if (s != nullptr) grpc_call_unref(s); + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +// Test receive message limit with compressed response larger than the limit. +static void test_max_receive_message_length_on_compressed_response( + grpc_end2end_test_config config, bool minimal_stack) { + gpr_log(GPR_INFO, + "testing max receive message length on compressed response with " + "minimal_stack=%d", + minimal_stack); + grpc_end2end_test_fixture f; + grpc_call* c = nullptr; + grpc_call* s = nullptr; + cq_verifier* cqv; + grpc_op ops[6]; + grpc_op* op; + grpc_slice response_payload_slice = grpc_slice_malloc(1024); + memset(GRPC_SLICE_START_PTR(response_payload_slice), 'a', 1024); + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer* recv_payload = nullptr; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + + // Set limit via channel args. + grpc_arg arg[2]; + arg[0] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5); + arg[1] = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_MINIMAL_STACK), minimal_stack); + grpc_channel_args* client_args = + grpc_channel_args_copy_and_add(nullptr, arg, 2); + + f = begin_test(config, "test_max_response_message_length", client_args, + nullptr); + { + grpc_core::ExecCtx exec_ctx; + grpc_channel_args_destroy(client_args); + } + cqv = cq_verifier_create(f.cq); + + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), + nullptr); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + + grpc_metadata compression_md = gzip_compression_override(); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 1; + op->data.send_initial_metadata.metadata = &compression_md; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + if (minimal_stack) { + /* We do not perform message size checks for minimal stack. */ + GPR_ASSERT(status == GRPC_STATUS_OK); + } else { + GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); + GPR_ASSERT(grpc_slice_str_cmp( + details, "Received message larger than max (29 vs. 5)") == + 0); + } + grpc_slice_unref(details); + grpc_slice_unref(response_payload_slice); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(recv_payload); + grpc_call_unref(c); if (s != nullptr) grpc_call_unref(s); @@ -500,6 +823,15 @@ void max_message_length(grpc_end2end_test_config config) { test_max_message_length_on_response(config, false /* send_limit */, true /* use_service_config */, true /* use_string_json_value */); + /* The following tests are not useful for inproc transport and do not work + * with our simple proxy. */ + if (strcmp(config.name, "inproc") != 0 && + (config.feature_mask & FEATURE_MASK_SUPPORTS_REQUEST_PROXYING) == 0) { + test_max_receive_message_length_on_compressed_request(config, false); + test_max_receive_message_length_on_compressed_request(config, true); + test_max_receive_message_length_on_compressed_response(config, false); + test_max_receive_message_length_on_compressed_response(config, true); + } } void max_message_length_pre_init(void) {} diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 803ad38f1c5..b4a70272ea6 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -529,9 +529,10 @@ static void BM_IsolatedFilter(benchmark::State& state) { grpc_call_final_info final_info; TestOp test_op_data; const int kArenaSize = 4096; + grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {}; grpc_call_element_args call_args{call_stack, nullptr, - nullptr, + context, method, start_time, deadline, diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 613e824f4d7..3a4fdf1b017 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1162,6 +1162,7 @@ src/core/ext/filters/client_channel/server_address.h \ src/core/ext/filters/client_channel/service_config.cc \ src/core/ext/filters/client_channel/service_config.h \ src/core/ext/filters/client_channel/service_config_call_data.h \ +src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \ src/core/ext/filters/client_channel/service_config_parser.cc \ src/core/ext/filters/client_channel/service_config_parser.h \ src/core/ext/filters/client_channel/subchannel.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index b026c8b1f8a..1ed428487fe 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -962,6 +962,7 @@ src/core/ext/filters/client_channel/server_address.h \ src/core/ext/filters/client_channel/service_config.cc \ src/core/ext/filters/client_channel/service_config.h \ src/core/ext/filters/client_channel/service_config_call_data.h \ +src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \ src/core/ext/filters/client_channel/service_config_parser.cc \ src/core/ext/filters/client_channel/service_config_parser.h \ src/core/ext/filters/client_channel/subchannel.cc \