diff --git a/BUILD b/BUILD
index 99750d060cc..6cdc1a5104a 100644
--- a/BUILD
+++ b/BUILD
@@ -1045,6 +1045,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/retry_throttle.cc",
"src/core/ext/filters/client_channel/server_address.cc",
"src/core/ext/filters/client_channel/service_config.cc",
+ "src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc",
"src/core/ext/filters/client_channel/service_config_parser.cc",
"src/core/ext/filters/client_channel/subchannel.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
@@ -1185,6 +1186,7 @@ grpc_cc_library(
language = "c++",
deps = [
"grpc_base",
+ "grpc_message_size_filter",
],
)
diff --git a/BUILD.gn b/BUILD.gn
index 378250dd2c0..48a1d84e668 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -293,6 +293,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/service_config.cc",
"src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/service_config_call_data.h",
+ "src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc",
"src/core/ext/filters/client_channel/service_config_parser.cc",
"src/core/ext/filters/client_channel/service_config_parser.h",
"src/core/ext/filters/client_channel/subchannel.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4e0c695a5bc..3c36a9053b5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1375,6 +1375,7 @@ add_library(grpc
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/server_address.cc
src/core/ext/filters/client_channel/service_config.cc
+ src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
src/core/ext/filters/client_channel/service_config_parser.cc
src/core/ext/filters/client_channel/subchannel.cc
src/core/ext/filters/client_channel/subchannel_pool_interface.cc
@@ -2046,6 +2047,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/server_address.cc
src/core/ext/filters/client_channel/service_config.cc
+ src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
src/core/ext/filters/client_channel/service_config_parser.cc
src/core/ext/filters/client_channel/subchannel.cc
src/core/ext/filters/client_channel/subchannel_pool_interface.cc
diff --git a/Makefile b/Makefile
index 83c7600a698..f1be51c42dc 100644
--- a/Makefile
+++ b/Makefile
@@ -3677,6 +3677,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \
+ src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
@@ -4322,6 +4323,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \
+ src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 5f09c7b3e73..a5552dfcdbb 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -793,6 +793,7 @@ libs:
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/server_address.cc
- src/core/ext/filters/client_channel/service_config.cc
+ - src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
- src/core/ext/filters/client_channel/service_config_parser.cc
- src/core/ext/filters/client_channel/subchannel.cc
- src/core/ext/filters/client_channel/subchannel_pool_interface.cc
@@ -1652,6 +1653,7 @@ libs:
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/server_address.cc
- src/core/ext/filters/client_channel/service_config.cc
+ - src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
- src/core/ext/filters/client_channel/service_config_parser.cc
- src/core/ext/filters/client_channel/subchannel.cc
- src/core/ext/filters/client_channel/subchannel_pool_interface.cc
diff --git a/config.m4 b/config.m4
index cd1019abcd9..4f0fd8513d9 100644
--- a/config.m4
+++ b/config.m4
@@ -92,6 +92,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \
+ src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
diff --git a/config.w32 b/config.w32
index 8109f9103af..093add492ad 100644
--- a/config.w32
+++ b/config.w32
@@ -61,6 +61,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\retry_throttle.cc " +
"src\\core\\ext\\filters\\client_channel\\server_address.cc " +
"src\\core\\ext\\filters\\client_channel\\service_config.cc " +
+ "src\\core\\ext\\filters\\client_channel\\service_config_channel_arg_filter.cc " +
"src\\core\\ext\\filters\\client_channel\\service_config_parser.cc " +
"src\\core\\ext\\filters\\client_channel\\subchannel.cc " +
"src\\core\\ext\\filters\\client_channel\\subchannel_pool_interface.cc " +
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 695768422c2..e4ae503d0a2 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -277,6 +277,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/service_config.cc',
'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/service_config_call_data.h',
+ 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/service_config_parser.h',
'src/core/ext/filters/client_channel/subchannel.cc',
diff --git a/grpc.gemspec b/grpc.gemspec
index af04b9d9de4..66abcc3f64f 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -199,6 +199,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/service_config.cc )
s.files += %w( src/core/ext/filters/client_channel/service_config.h )
s.files += %w( src/core/ext/filters/client_channel/service_config_call_data.h )
+ s.files += %w( src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc )
s.files += %w( src/core/ext/filters/client_channel/service_config_parser.cc )
s.files += %w( src/core/ext/filters/client_channel/service_config_parser.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel.cc )
diff --git a/grpc.gyp b/grpc.gyp
index 4022227f5e3..f8e1d48c3cc 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -487,6 +487,7 @@
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',
+ 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
@@ -994,6 +995,7 @@
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',
+ 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
diff --git a/package.xml b/package.xml
index 0166a778789..93b22c15c0b 100644
--- a/package.xml
+++ b/package.xml
@@ -179,6 +179,7 @@
+
diff --git a/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc b/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
new file mode 100644
index 00000000000..6b8e6373af7
--- /dev/null
+++ b/src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
@@ -0,0 +1,142 @@
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+// This filter reads GRPC_ARG_SERVICE_CONFIG and populates ServiceConfigCallData
+// in the call context per call for direct channels.
+
+#include
+
+#include "src/core/ext/filters/client_channel/service_config_call_data.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/surface/channel_init.h"
+
+namespace grpc_core {
+
+namespace {
+
+class ServiceConfigChannelArgChannelData {
+ public:
+ explicit ServiceConfigChannelArgChannelData(
+ const grpc_channel_element_args* args) {
+ const char* service_config_str = grpc_channel_args_find_string(
+ args->channel_args, GRPC_ARG_SERVICE_CONFIG);
+ if (service_config_str != nullptr) {
+ grpc_error* service_config_error = GRPC_ERROR_NONE;
+ auto service_config =
+ ServiceConfig::Create(service_config_str, &service_config_error);
+ if (service_config_error == GRPC_ERROR_NONE) {
+ service_config_ = std::move(service_config);
+ } else {
+ gpr_log(GPR_ERROR, "%s", grpc_error_string(service_config_error));
+ }
+ GRPC_ERROR_UNREF(service_config_error);
+ }
+ }
+
+ RefCountedPtr service_config() const {
+ return service_config_;
+ }
+
+ private:
+ RefCountedPtr service_config_;
+};
+
+class ServiceConfigChannelArgCallData {
+ public:
+ ServiceConfigChannelArgCallData(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ ServiceConfigChannelArgChannelData* chand =
+ static_cast(elem->channel_data);
+ RefCountedPtr service_config = chand->service_config();
+ if (service_config != nullptr) {
+ GPR_DEBUG_ASSERT(args->context != nullptr);
+ const auto* method_params_vector =
+ service_config->GetMethodParsedConfigVector(args->path);
+ args->arena->New(
+ std::move(service_config), method_params_vector, args->context);
+ }
+ }
+};
+
+grpc_error* ServiceConfigChannelArgInitCallElem(
+ grpc_call_element* elem, const grpc_call_element_args* args) {
+ ServiceConfigChannelArgCallData* calld =
+ static_cast(elem->call_data);
+ new (calld) ServiceConfigChannelArgCallData(elem, args);
+ return GRPC_ERROR_NONE;
+}
+
+void ServiceConfigChannelArgDestroyCallElem(
+ grpc_call_element* elem, const grpc_call_final_info* /* final_info */,
+ grpc_closure* /* then_schedule_closure */) {
+ ServiceConfigChannelArgCallData* calld =
+ static_cast(elem->call_data);
+ calld->~ServiceConfigChannelArgCallData();
+}
+
+grpc_error* ServiceConfigChannelArgInitChannelElem(
+ grpc_channel_element* elem, grpc_channel_element_args* args) {
+ ServiceConfigChannelArgChannelData* chand =
+ static_cast(elem->channel_data);
+ new (chand) ServiceConfigChannelArgChannelData(args);
+ return GRPC_ERROR_NONE;
+}
+
+void ServiceConfigChannelArgDestroyChannelElem(grpc_channel_element* elem) {
+ ServiceConfigChannelArgChannelData* chand =
+ static_cast(elem->channel_data);
+ chand->~ServiceConfigChannelArgChannelData();
+}
+
+const grpc_channel_filter ServiceConfigChannelArgFilter = {
+ grpc_call_next_op,
+ grpc_channel_next_op,
+ sizeof(ServiceConfigChannelArgCallData),
+ ServiceConfigChannelArgInitCallElem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ ServiceConfigChannelArgDestroyCallElem,
+ sizeof(ServiceConfigChannelArgChannelData),
+ ServiceConfigChannelArgInitChannelElem,
+ ServiceConfigChannelArgDestroyChannelElem,
+ grpc_channel_next_get_info,
+ "service_config_channel_arg"};
+
+bool maybe_add_service_config_channel_arg_filter(
+ grpc_channel_stack_builder* builder, void* /* arg */) {
+ const grpc_channel_args* channel_args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ if (grpc_channel_args_want_minimal_stack(channel_args) ||
+ grpc_channel_args_find_string(channel_args, GRPC_ARG_SERVICE_CONFIG) ==
+ nullptr) {
+ return true;
+ }
+ return grpc_channel_stack_builder_prepend_filter(
+ builder, &ServiceConfigChannelArgFilter, nullptr, nullptr);
+}
+
+} // namespace
+
+} // namespace grpc_core
+
+void grpc_service_config_channel_arg_filter_init(void) {
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ grpc_core::maybe_add_service_config_channel_arg_filter, nullptr);
+}
+
+void grpc_service_config_channel_arg_filter_shutdown(void) {}
diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc
index 2fedc7fe3d7..637dc3030f2 100644
--- a/src/core/ext/filters/http/http_filters_plugin.cc
+++ b/src/core/ext/filters/http/http_filters_plugin.cc
@@ -38,7 +38,8 @@ static optional_filter compress_filter = {
&grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION};
static optional_filter decompress_filter = {
- &grpc_message_decompress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION};
+ &grpc_core::MessageDecompressFilter,
+ GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION};
static bool is_building_http_like_transport(
grpc_channel_stack_builder* builder) {
diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc
index d12f4013bb2..c16ea66e7a3 100644
--- a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc
+++ b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc
@@ -18,6 +18,8 @@
#include
+#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
+
#include
#include
@@ -27,7 +29,8 @@
#include
#include
-#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
+#include "absl/strings/str_format.h"
+#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/compression/algorithm_metadata.h"
#include "src/core/lib/compression/compression_args.h"
@@ -37,14 +40,25 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
+namespace grpc_core {
namespace {
-class ChannelData {};
+class ChannelData {
+ public:
+ explicit ChannelData(const grpc_channel_element_args* args)
+ : max_recv_size_(GetMaxRecvSizeFromChannelArgs(args->channel_args)) {}
+
+ int max_recv_size() const { return max_recv_size_; }
+
+ private:
+ int max_recv_size_;
+};
class CallData {
public:
- explicit CallData(const grpc_call_element_args& args)
- : call_combiner_(args.call_combiner) {
+ CallData(const grpc_call_element_args& args, const ChannelData* chand)
+ : call_combiner_(args.call_combiner),
+ max_recv_message_length_(chand->max_recv_size()) {
// Initialize state for recv_initial_metadata_ready callback
GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_,
OnRecvInitialMetadataReady, this,
@@ -59,6 +73,13 @@ class CallData {
GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_,
OnRecvTrailingMetadataReady, this,
grpc_schedule_on_exec_ctx);
+ const MessageSizeParsedConfig* limits =
+ MessageSizeParsedConfig::GetFromCallContext(args.context);
+ if (limits != nullptr && limits->limits().max_recv_size >= 0 &&
+ (limits->limits().max_recv_size < max_recv_message_length_ ||
+ max_recv_message_length_ < 0)) {
+ max_recv_message_length_ = limits->limits().max_recv_size;
+ }
}
~CallData() { grpc_slice_buffer_destroy_internal(&recv_slices_); }
@@ -82,7 +103,7 @@ class CallData {
void MaybeResumeOnRecvTrailingMetadataReady();
static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error);
- grpc_core::CallCombiner* call_combiner_;
+ CallCombiner* call_combiner_;
// Overall error for the call
grpc_error* error_ = GRPC_ERROR_NONE;
// Fields for handling recv_initial_metadata_ready callback
@@ -91,17 +112,18 @@ class CallData {
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
// Fields for handling recv_message_ready callback
bool seen_recv_message_ready_ = false;
+ int max_recv_message_length_;
grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE;
grpc_closure on_recv_message_ready_;
grpc_closure* original_recv_message_ready_ = nullptr;
grpc_closure on_recv_message_next_done_;
- grpc_core::OrphanablePtr* recv_message_ = nullptr;
+ OrphanablePtr* recv_message_ = nullptr;
// recv_slices_ holds the slices read from the original recv_message stream.
// It is initialized during construction and reset when a new stream is
// created using it.
grpc_slice_buffer recv_slices_;
- std::aligned_storage::type
+ std::aligned_storage::type
recv_replacement_stream_;
// Fields for handling recv_trailing_metadata_ready callback
bool seen_recv_trailing_metadata_ready_ = false;
@@ -139,7 +161,7 @@ void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) {
calld->MaybeResumeOnRecvTrailingMetadataReady();
grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
calld->original_recv_initial_metadata_ready_ = nullptr;
- grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
+ Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
}
void CallData::MaybeResumeOnRecvMessageReady() {
@@ -170,6 +192,19 @@ void CallData::OnRecvMessageReady(void* arg, grpc_error* error) {
0) {
return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE);
}
+ if (calld->max_recv_message_length_ >= 0 &&
+ (*calld->recv_message_)->length() >
+ static_cast(calld->max_recv_message_length_)) {
+ std::string message_string = absl::StrFormat(
+ "Received message larger than max (%u vs. %d)",
+ (*calld->recv_message_)->length(), calld->max_recv_message_length_);
+ GPR_DEBUG_ASSERT(calld->error_ == GRPC_ERROR_NONE);
+ calld->error_ = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string.c_str()),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
+ return calld->ContinueRecvMessageReadyCallback(
+ GRPC_ERROR_REF(calld->error_));
+ }
grpc_slice_buffer_destroy_internal(&calld->recv_slices_);
grpc_slice_buffer_init(&calld->recv_slices_);
return calld->ContinueReadingRecvMessage();
@@ -241,9 +276,9 @@ void CallData::FinishRecvMessage() {
// Initializing recv_replacement_stream_ with decompressed_slices removes
// all the slices from decompressed_slices leaving it empty.
new (&recv_replacement_stream_)
- grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags);
- recv_message_->reset(reinterpret_cast(
- &recv_replacement_stream_));
+ SliceBufferByteStream(&decompressed_slices, recv_flags);
+ recv_message_->reset(
+ reinterpret_cast(&recv_replacement_stream_));
recv_message_ = nullptr;
}
ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_));
@@ -254,7 +289,7 @@ void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) {
// The surface will clean up the receiving stream if there is an error.
grpc_closure* closure = original_recv_message_ready_;
original_recv_message_ready_ = nullptr;
- grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
+ Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::MaybeResumeOnRecvTrailingMetadataReady() {
@@ -283,7 +318,7 @@ void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) {
calld->error_ = GRPC_ERROR_NONE;
grpc_closure* closure = calld->original_recv_trailing_metadata_ready_;
calld->original_recv_trailing_metadata_ready_ = nullptr;
- grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
+ Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::DecompressStartTransportStreamOpBatch(
@@ -322,37 +357,44 @@ void DecompressStartTransportStreamOpBatch(
calld->DecompressStartTransportStreamOpBatch(elem, batch);
}
-static grpc_error* DecompressInitCallElem(grpc_call_element* elem,
- const grpc_call_element_args* args) {
- new (elem->call_data) CallData(*args);
+grpc_error* DecompressInitCallElem(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ ChannelData* chand = static_cast(elem->channel_data);
+ new (elem->call_data) CallData(*args, chand);
return GRPC_ERROR_NONE;
}
-static void DecompressDestroyCallElem(
- grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
- grpc_closure* /*ignored*/) {
+void DecompressDestroyCallElem(grpc_call_element* elem,
+ const grpc_call_final_info* /*final_info*/,
+ grpc_closure* /*ignored*/) {
CallData* calld = static_cast(elem->call_data);
calld->~CallData();
}
-static grpc_error* DecompressInitChannelElem(
- grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) {
+grpc_error* DecompressInitChannelElem(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ ChannelData* chand = static_cast(elem->channel_data);
+ new (chand) ChannelData(args);
return GRPC_ERROR_NONE;
}
-void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) {}
+void DecompressDestroyChannelElem(grpc_channel_element* elem) {
+ ChannelData* chand = static_cast(elem->channel_data);
+ chand->~ChannelData();
+}
} // namespace
-const grpc_channel_filter grpc_message_decompress_filter = {
+const grpc_channel_filter MessageDecompressFilter = {
DecompressStartTransportStreamOpBatch,
grpc_channel_next_op,
sizeof(CallData),
DecompressInitCallElem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
DecompressDestroyCallElem,
- 0, // sizeof(ChannelData)
+ sizeof(ChannelData),
DecompressInitChannelElem,
DecompressDestroyChannelElem,
grpc_channel_next_get_info,
"message_decompress"};
+} // namespace grpc_core
diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.h b/src/core/ext/filters/http/message_compress/message_decompress_filter.h
index 7d567bf08a2..f19a4ca0cbd 100644
--- a/src/core/ext/filters/http/message_compress/message_decompress_filter.h
+++ b/src/core/ext/filters/http/message_compress/message_decompress_filter.h
@@ -23,7 +23,9 @@
#include "src/core/lib/channel/channel_stack.h"
-extern const grpc_channel_filter grpc_message_decompress_filter;
+namespace grpc_core {
+extern const grpc_channel_filter MessageDecompressFilter;
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \
*/
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index d2ef5477636..89fdab6fae8 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -45,6 +45,25 @@ namespace {
size_t g_message_size_parser_index;
} // namespace
+//
+// MessageSizeParsedConfig
+//
+
+const MessageSizeParsedConfig* MessageSizeParsedConfig::GetFromCallContext(
+ const grpc_call_context_element* context) {
+ if (context == nullptr) return nullptr;
+ auto* svc_cfg_call_data = static_cast(
+ context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
+ if (svc_cfg_call_data == nullptr) return nullptr;
+ return static_cast(
+ svc_cfg_call_data->GetMethodParsedConfig(
+ MessageSizeParser::ParserIndex()));
+}
+
+//
+// MessageSizeParser
+//
+
std::unique_ptr
MessageSizeParser::ParsePerMethodParams(const Json& json, grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
@@ -97,12 +116,26 @@ void MessageSizeParser::Register() {
}
size_t MessageSizeParser::ParserIndex() { return g_message_size_parser_index; }
+
+int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args) {
+ if (grpc_channel_args_want_minimal_stack(args)) return -1;
+ return grpc_channel_args_find_integer(
+ args, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
+ {GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX});
+}
+
+int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args) {
+ if (grpc_channel_args_want_minimal_stack(args)) return -1;
+ return grpc_channel_args_find_integer(
+ args, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH,
+ {GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX});
+}
+
} // namespace grpc_core
namespace {
struct channel_data {
grpc_core::MessageSizeParsedConfig::message_size_limits limits;
- grpc_core::RefCountedPtr svc_cfg;
};
struct call_data {
@@ -118,24 +151,8 @@ struct call_data {
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
// size to the receive limit.
- const grpc_core::MessageSizeParsedConfig* limits = nullptr;
- grpc_core::ServiceConfigCallData* svc_cfg_call_data = nullptr;
- if (args.context != nullptr) {
- svc_cfg_call_data = static_cast(
- args.context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
- }
- if (svc_cfg_call_data != nullptr) {
- limits = static_cast(
- svc_cfg_call_data->GetMethodParsedConfig(
- grpc_core::MessageSizeParser::ParserIndex()));
- } else if (chand.svc_cfg != nullptr) {
- const auto* objs_vector =
- chand.svc_cfg->GetMethodParsedConfigVector(args.path);
- if (objs_vector != nullptr) {
- limits = static_cast(
- (*objs_vector)[grpc_core::MessageSizeParser::ParserIndex()].get());
- }
- }
+ const grpc_core::MessageSizeParsedConfig* limits =
+ grpc_core::MessageSizeParsedConfig::GetFromCallContext(args.context);
if (limits != nullptr) {
if (limits->limits().max_send_size >= 0 &&
(limits->limits().max_send_size < this->limits.max_send_size ||
@@ -288,35 +305,11 @@ static void message_size_destroy_call_elem(
calld->~call_data();
}
-static int default_size(const grpc_channel_args* args,
- int without_minimal_stack) {
- if (grpc_channel_args_want_minimal_stack(args)) {
- return -1;
- }
- return without_minimal_stack;
-}
-
grpc_core::MessageSizeParsedConfig::message_size_limits get_message_size_limits(
const grpc_channel_args* channel_args) {
grpc_core::MessageSizeParsedConfig::message_size_limits lim;
- lim.max_send_size =
- default_size(channel_args, GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH);
- lim.max_recv_size =
- default_size(channel_args, GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH);
- for (size_t i = 0; i < channel_args->num_args; ++i) {
- if (strcmp(channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) ==
- 0) {
- const grpc_integer_options options = {lim.max_send_size, -1, INT_MAX};
- lim.max_send_size =
- grpc_channel_arg_get_integer(&channel_args->args[i], options);
- }
- if (strcmp(channel_args->args[i].key,
- GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
- const grpc_integer_options options = {lim.max_recv_size, -1, INT_MAX};
- lim.max_recv_size =
- grpc_channel_arg_get_integer(&channel_args->args[i], options);
- }
- }
+ lim.max_send_size = grpc_core::GetMaxSendSizeFromChannelArgs(channel_args);
+ lim.max_recv_size = grpc_core::GetMaxRecvSizeFromChannelArgs(channel_args);
return lim;
}
@@ -327,26 +320,6 @@ static grpc_error* message_size_init_channel_elem(
channel_data* chand = static_cast(elem->channel_data);
new (chand) channel_data();
chand->limits = get_message_size_limits(args->channel_args);
- // TODO(yashykt): We only need to read GRPC_ARG_SERVICE_CONFIG in the case of
- // direct channels. (Service config is otherwise stored in the call_context by
- // client_channel filter.) If we ever need a second filter that also needs to
- // parse GRPC_ARG_SERVICE_CONFIG, we should refactor this code and add a
- // separate filter that reads GRPC_ARG_SERVICE_CONFIG and saves the parsed
- // config in the call_context.
- const grpc_arg* channel_arg =
- grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
- const char* service_config_str = grpc_channel_arg_get_string(channel_arg);
- if (service_config_str != nullptr) {
- grpc_error* service_config_error = GRPC_ERROR_NONE;
- auto svc_cfg = grpc_core::ServiceConfig::Create(service_config_str,
- &service_config_error);
- if (service_config_error == GRPC_ERROR_NONE) {
- chand->svc_cfg = std::move(svc_cfg);
- } else {
- gpr_log(GPR_ERROR, "%s", grpc_error_string(service_config_error));
- }
- GRPC_ERROR_UNREF(service_config_error);
- }
return GRPC_ERROR_NONE;
}
@@ -387,6 +360,9 @@ static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder,
void* /*arg*/) {
const grpc_channel_args* channel_args =
grpc_channel_stack_builder_get_channel_arguments(builder);
+ if (grpc_channel_args_want_minimal_stack(channel_args)) {
+ return true;
+ }
bool enable = false;
grpc_core::MessageSizeParsedConfig::message_size_limits lim =
get_message_size_limits(channel_args);
diff --git a/src/core/ext/filters/message_size/message_size_filter.h b/src/core/ext/filters/message_size/message_size_filter.h
index 132d7b2af0f..ea0dd0266d0 100644
--- a/src/core/ext/filters/message_size/message_size_filter.h
+++ b/src/core/ext/filters/message_size/message_size_filter.h
@@ -40,6 +40,9 @@ class MessageSizeParsedConfig : public ServiceConfigParser::ParsedConfig {
const message_size_limits& limits() const { return limits_; }
+ static const MessageSizeParsedConfig* GetFromCallContext(
+ const grpc_call_context_element* context);
+
private:
message_size_limits limits_;
};
@@ -54,6 +57,9 @@ class MessageSizeParser : public ServiceConfigParser::Parser {
static size_t ParserIndex();
};
+int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args);
+int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args);
+
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H */
diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc
index f1c3cbf4036..ef8b10df647 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_plugin_registry.cc
@@ -64,6 +64,8 @@ void grpc_max_age_filter_init(void);
void grpc_max_age_filter_shutdown(void);
void grpc_message_size_filter_init(void);
void grpc_message_size_filter_shutdown(void);
+void grpc_service_config_channel_arg_filter_init(void);
+void grpc_service_config_channel_arg_filter_shutdown(void);
void grpc_client_authority_filter_init(void);
void grpc_client_authority_filter_shutdown(void);
void grpc_workaround_cronet_compression_filter_init(void);
@@ -114,6 +116,8 @@ void grpc_register_built_in_plugins(void) {
grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init,
grpc_message_size_filter_shutdown);
+ grpc_register_plugin(grpc_service_config_channel_arg_filter_init,
+ grpc_service_config_channel_arg_filter_shutdown);
grpc_register_plugin(grpc_client_authority_filter_init,
grpc_client_authority_filter_shutdown);
grpc_register_plugin(grpc_workaround_cronet_compression_filter_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
index de53f173294..525fa108d81 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
@@ -64,6 +64,8 @@ void grpc_max_age_filter_init(void);
void grpc_max_age_filter_shutdown(void);
void grpc_message_size_filter_init(void);
void grpc_message_size_filter_shutdown(void);
+void grpc_service_config_channel_arg_filter_init(void);
+void grpc_service_config_channel_arg_filter_shutdown(void);
void grpc_client_authority_filter_init(void);
void grpc_client_authority_filter_shutdown(void);
void grpc_workaround_cronet_compression_filter_init(void);
@@ -114,6 +116,8 @@ void grpc_register_built_in_plugins(void) {
grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init,
grpc_message_size_filter_shutdown);
+ grpc_register_plugin(grpc_service_config_channel_arg_filter_init,
+ grpc_service_config_channel_arg_filter_shutdown);
grpc_register_plugin(grpc_client_authority_filter_init,
grpc_client_authority_filter_shutdown);
grpc_register_plugin(grpc_workaround_cronet_compression_filter_init,
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 37bed16955f..7afe714e078 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -70,6 +70,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',
+ 'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
diff --git a/test/core/end2end/tests/max_message_length.cc b/test/core/end2end/tests/max_message_length.cc
index 40e752a3d63..256cc982940 100644
--- a/test/core/end2end/tests/max_message_length.cc
+++ b/test/core/end2end/tests/max_message_length.cc
@@ -29,6 +29,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/metadata.h"
#include "test/core/end2end/cq_verifier.h"
@@ -466,6 +467,328 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config,
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(recv_payload);
+ grpc_call_unref(c);
+ if (s != nullptr) grpc_call_unref(s);
+ cq_verifier_destroy(cqv);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+static grpc_metadata gzip_compression_override() {
+ grpc_metadata gzip_compression_override;
+ gzip_compression_override.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ gzip_compression_override.value = grpc_slice_from_static_string("gzip");
+ memset(&gzip_compression_override.internal_data, 0,
+ sizeof(gzip_compression_override.internal_data));
+ return gzip_compression_override;
+}
+
+// Test receive message limit with compressed request larger than the limit
+static void test_max_receive_message_length_on_compressed_request(
+ grpc_end2end_test_config config, bool minimal_stack) {
+ gpr_log(GPR_INFO,
+ "test max receive message length on compressed request with "
+ "minimal_stack=%d",
+ minimal_stack);
+ grpc_end2end_test_fixture f;
+ grpc_call* c = nullptr;
+ grpc_call* s = nullptr;
+ cq_verifier* cqv;
+ grpc_op ops[6];
+ grpc_op* op;
+ grpc_slice request_payload_slice = grpc_slice_malloc(1024);
+ memset(GRPC_SLICE_START_PTR(request_payload_slice), 'a', 1024);
+ grpc_byte_buffer* request_payload =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ grpc_byte_buffer* recv_payload = nullptr;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ grpc_call_error error;
+ grpc_slice details, status_details;
+ int was_cancelled = 2;
+
+ // Set limit via channel args.
+ grpc_arg arg[2];
+ arg[0] = grpc_channel_arg_integer_create(
+ const_cast(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5);
+ arg[1] = grpc_channel_arg_integer_create(
+ const_cast(GRPC_ARG_MINIMAL_STACK), minimal_stack);
+ grpc_channel_args* server_args =
+ grpc_channel_args_copy_and_add(nullptr, arg, 2);
+
+ f = begin_test(config, "test_max_request_message_length", nullptr,
+ server_args);
+ {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_channel_args_destroy(server_args);
+ }
+ cqv = cq_verifier_create(f.cq);
+ c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ grpc_slice_from_static_string("/service/method"),
+ nullptr, gpr_inf_future(GPR_CLOCK_REALTIME),
+ nullptr);
+ GPR_ASSERT(c);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ grpc_metadata compression_md = gzip_compression_override();
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 1;
+ op->data.send_initial_metadata.metadata = &compression_md;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message = request_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ error =
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(101));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
+ cq_verify(cqv);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &recv_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ if (minimal_stack) {
+ /* Expect the RPC to proceed normally for a minimal stack */
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ status_details = grpc_slice_from_static_string("xyz");
+ op->data.send_status_from_server.status_details = &status_details;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ }
+ error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
+ CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
+ if (minimal_stack) {
+ /* We do not perform message size checks for minimal stack. */
+ GPR_ASSERT(status == GRPC_STATUS_OK);
+ } else {
+ GPR_ASSERT(was_cancelled == 1);
+ GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GPR_ASSERT(grpc_slice_str_cmp(
+ details, "Received message larger than max (29 vs. 5)") ==
+ 0);
+ }
+ grpc_slice_unref(details);
+ grpc_slice_unref(request_payload_slice);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+ grpc_byte_buffer_destroy(request_payload);
+ grpc_byte_buffer_destroy(recv_payload);
+ grpc_call_unref(c);
+ if (s != nullptr) grpc_call_unref(s);
+ cq_verifier_destroy(cqv);
+
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+// Test receive message limit with compressed response larger than the limit.
+static void test_max_receive_message_length_on_compressed_response(
+ grpc_end2end_test_config config, bool minimal_stack) {
+ gpr_log(GPR_INFO,
+ "testing max receive message length on compressed response with "
+ "minimal_stack=%d",
+ minimal_stack);
+ grpc_end2end_test_fixture f;
+ grpc_call* c = nullptr;
+ grpc_call* s = nullptr;
+ cq_verifier* cqv;
+ grpc_op ops[6];
+ grpc_op* op;
+ grpc_slice response_payload_slice = grpc_slice_malloc(1024);
+ memset(GRPC_SLICE_START_PTR(response_payload_slice), 'a', 1024);
+ grpc_byte_buffer* response_payload =
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+ grpc_byte_buffer* recv_payload = nullptr;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ grpc_call_error error;
+ grpc_slice details;
+ int was_cancelled = 2;
+
+ // Set limit via channel args.
+ grpc_arg arg[2];
+ arg[0] = grpc_channel_arg_integer_create(
+ const_cast(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5);
+ arg[1] = grpc_channel_arg_integer_create(
+ const_cast(GRPC_ARG_MINIMAL_STACK), minimal_stack);
+ grpc_channel_args* client_args =
+ grpc_channel_args_copy_and_add(nullptr, arg, 2);
+
+ f = begin_test(config, "test_max_response_message_length", client_args,
+ nullptr);
+ {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_channel_args_destroy(client_args);
+ }
+ cqv = cq_verifier_create(f.cq);
+
+ c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ grpc_slice_from_static_string("/service/method"),
+ nullptr, gpr_inf_future(GPR_CLOCK_REALTIME),
+ nullptr);
+ GPR_ASSERT(c);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &recv_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ error =
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(101));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
+ cq_verify(cqv);
+
+ grpc_metadata compression_md = gzip_compression_override();
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 1;
+ op->data.send_initial_metadata.metadata = &compression_md;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message = response_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ grpc_slice status_details = grpc_slice_from_static_string("xyz");
+ op->data.send_status_from_server.status_details = &status_details;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
+ CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
+ if (minimal_stack) {
+ /* We do not perform message size checks for minimal stack. */
+ GPR_ASSERT(status == GRPC_STATUS_OK);
+ } else {
+ GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GPR_ASSERT(grpc_slice_str_cmp(
+ details, "Received message larger than max (29 vs. 5)") ==
+ 0);
+ }
+ grpc_slice_unref(details);
+ grpc_slice_unref(response_payload_slice);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+ grpc_byte_buffer_destroy(response_payload);
+ grpc_byte_buffer_destroy(recv_payload);
+
grpc_call_unref(c);
if (s != nullptr) grpc_call_unref(s);
@@ -500,6 +823,15 @@ void max_message_length(grpc_end2end_test_config config) {
test_max_message_length_on_response(config, false /* send_limit */,
true /* use_service_config */,
true /* use_string_json_value */);
+ /* The following tests are not useful for inproc transport and do not work
+ * with our simple proxy. */
+ if (strcmp(config.name, "inproc") != 0 &&
+ (config.feature_mask & FEATURE_MASK_SUPPORTS_REQUEST_PROXYING) == 0) {
+ test_max_receive_message_length_on_compressed_request(config, false);
+ test_max_receive_message_length_on_compressed_request(config, true);
+ test_max_receive_message_length_on_compressed_response(config, false);
+ test_max_receive_message_length_on_compressed_response(config, true);
+ }
}
void max_message_length_pre_init(void) {}
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 803ad38f1c5..b4a70272ea6 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -529,9 +529,10 @@ static void BM_IsolatedFilter(benchmark::State& state) {
grpc_call_final_info final_info;
TestOp test_op_data;
const int kArenaSize = 4096;
+ grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
grpc_call_element_args call_args{call_stack,
nullptr,
- nullptr,
+ context,
method,
start_time,
deadline,
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index e69d345dfbb..91791882299 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1163,6 +1163,7 @@ src/core/ext/filters/client_channel/server_address.h \
src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config.h \
src/core/ext/filters/client_channel/service_config_call_data.h \
+src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/service_config_parser.h \
src/core/ext/filters/client_channel/subchannel.cc \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index bff4f23c4a4..eacee0658f7 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -963,6 +963,7 @@ src/core/ext/filters/client_channel/server_address.h \
src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config.h \
src/core/ext/filters/client_channel/service_config_call_data.h \
+src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/service_config_parser.h \
src/core/ext/filters/client_channel/subchannel.cc \