Revert "Revert "Add message-size check before message decompression""

pull/23279/head
Yash Tibrewal 4 years ago committed by GitHub
parent 59dc7fb277
commit e98eaa5052
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 1
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 2
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 1
      gRPC-Core.podspec
  9. 1
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 1
      package.xml
  12. 142
      src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
  13. 3
      src/core/ext/filters/http/http_filters_plugin.cc
  14. 92
      src/core/ext/filters/http/message_compress/message_decompress_filter.cc
  15. 4
      src/core/ext/filters/http/message_compress/message_decompress_filter.h
  16. 106
      src/core/ext/filters/message_size/message_size_filter.cc
  17. 6
      src/core/ext/filters/message_size/message_size_filter.h
  18. 4
      src/core/plugin_registry/grpc_plugin_registry.cc
  19. 4
      src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
  20. 1
      src/python/grpcio/grpc_core_dependencies.py
  21. 332
      test/core/end2end/tests/max_message_length.cc
  22. 3
      test/cpp/microbenchmarks/bm_call_create.cc
  23. 1
      tools/doxygen/Doxyfile.c++.internal
  24. 1
      tools/doxygen/Doxyfile.core.internal

@ -1045,6 +1045,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/retry_throttle.cc",
"src/core/ext/filters/client_channel/server_address.cc",
"src/core/ext/filters/client_channel/service_config.cc",
"src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc",
"src/core/ext/filters/client_channel/service_config_parser.cc",
"src/core/ext/filters/client_channel/subchannel.cc",
"src/core/ext/filters/client_channel/subchannel_pool_interface.cc",
@ -1186,6 +1187,7 @@ grpc_cc_library(
language = "c++",
deps = [
"grpc_base",
"grpc_message_size_filter",
],
)

@ -295,6 +295,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/service_config.cc",
"src/core/ext/filters/client_channel/service_config.h",
"src/core/ext/filters/client_channel/service_config_call_data.h",
"src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc",
"src/core/ext/filters/client_channel/service_config_parser.cc",
"src/core/ext/filters/client_channel/service_config_parser.h",
"src/core/ext/filters/client_channel/subchannel.cc",

@ -1376,6 +1376,7 @@ add_library(grpc
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/server_address.cc
src/core/ext/filters/client_channel/service_config.cc
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
src/core/ext/filters/client_channel/service_config_parser.cc
src/core/ext/filters/client_channel/subchannel.cc
src/core/ext/filters/client_channel/subchannel_pool_interface.cc
@ -2048,6 +2049,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/server_address.cc
src/core/ext/filters/client_channel/service_config.cc
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
src/core/ext/filters/client_channel/service_config_parser.cc
src/core/ext/filters/client_channel/subchannel.cc
src/core/ext/filters/client_channel/subchannel_pool_interface.cc

@ -3679,6 +3679,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \
@ -4325,6 +4326,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \

@ -795,6 +795,7 @@ libs:
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/server_address.cc
- src/core/ext/filters/client_channel/service_config.cc
- src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
- src/core/ext/filters/client_channel/service_config_parser.cc
- src/core/ext/filters/client_channel/subchannel.cc
- src/core/ext/filters/client_channel/subchannel_pool_interface.cc
@ -1656,6 +1657,7 @@ libs:
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/server_address.cc
- src/core/ext/filters/client_channel/service_config.cc
- src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
- src/core/ext/filters/client_channel/service_config_parser.cc
- src/core/ext/filters/client_channel/subchannel.cc
- src/core/ext/filters/client_channel/subchannel_pool_interface.cc

@ -93,6 +93,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/server_address.cc \
src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/subchannel.cc \
src/core/ext/filters/client_channel/subchannel_pool_interface.cc \

@ -62,6 +62,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\retry_throttle.cc " +
"src\\core\\ext\\filters\\client_channel\\server_address.cc " +
"src\\core\\ext\\filters\\client_channel\\service_config.cc " +
"src\\core\\ext\\filters\\client_channel\\service_config_channel_arg_filter.cc " +
"src\\core\\ext\\filters\\client_channel\\service_config_parser.cc " +
"src\\core\\ext\\filters\\client_channel\\subchannel.cc " +
"src\\core\\ext\\filters\\client_channel\\subchannel_pool_interface.cc " +

@ -279,6 +279,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/service_config.cc',
'src/core/ext/filters/client_channel/service_config.h',
'src/core/ext/filters/client_channel/service_config_call_data.h',
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/service_config_parser.h',
'src/core/ext/filters/client_channel/subchannel.cc',

@ -201,6 +201,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/service_config.cc )
s.files += %w( src/core/ext/filters/client_channel/service_config.h )
s.files += %w( src/core/ext/filters/client_channel/service_config_call_data.h )
s.files += %w( src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc )
s.files += %w( src/core/ext/filters/client_channel/service_config_parser.cc )
s.files += %w( src/core/ext/filters/client_channel/service_config_parser.h )
s.files += %w( src/core/ext/filters/client_channel/subchannel.cc )

@ -488,6 +488,7 @@
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',
@ -996,6 +997,7 @@
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',

@ -181,6 +181,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config_call_data.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config_parser.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/service_config_parser.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/subchannel.cc" role="src" />

@ -0,0 +1,142 @@
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This filter reads GRPC_ARG_SERVICE_CONFIG and populates ServiceConfigCallData
// in the call context per call for direct channels.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/service_config_call_data.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_init.h"
namespace grpc_core {
namespace {
class ServiceConfigChannelArgChannelData {
public:
explicit ServiceConfigChannelArgChannelData(
const grpc_channel_element_args* args) {
const char* service_config_str = grpc_channel_args_find_string(
args->channel_args, GRPC_ARG_SERVICE_CONFIG);
if (service_config_str != nullptr) {
grpc_error* service_config_error = GRPC_ERROR_NONE;
auto service_config =
ServiceConfig::Create(service_config_str, &service_config_error);
if (service_config_error == GRPC_ERROR_NONE) {
service_config_ = std::move(service_config);
} else {
gpr_log(GPR_ERROR, "%s", grpc_error_string(service_config_error));
}
GRPC_ERROR_UNREF(service_config_error);
}
}
RefCountedPtr<ServiceConfig> service_config() const {
return service_config_;
}
private:
RefCountedPtr<ServiceConfig> service_config_;
};
class ServiceConfigChannelArgCallData {
public:
ServiceConfigChannelArgCallData(grpc_call_element* elem,
const grpc_call_element_args* args) {
ServiceConfigChannelArgChannelData* chand =
static_cast<ServiceConfigChannelArgChannelData*>(elem->channel_data);
RefCountedPtr<ServiceConfig> service_config = chand->service_config();
if (service_config != nullptr) {
GPR_DEBUG_ASSERT(args->context != nullptr);
const auto* method_params_vector =
service_config->GetMethodParsedConfigVector(args->path);
args->arena->New<ServiceConfigCallData>(
std::move(service_config), method_params_vector, args->context);
}
}
};
grpc_error* ServiceConfigChannelArgInitCallElem(
grpc_call_element* elem, const grpc_call_element_args* args) {
ServiceConfigChannelArgCallData* calld =
static_cast<ServiceConfigChannelArgCallData*>(elem->call_data);
new (calld) ServiceConfigChannelArgCallData(elem, args);
return GRPC_ERROR_NONE;
}
void ServiceConfigChannelArgDestroyCallElem(
grpc_call_element* elem, const grpc_call_final_info* /* final_info */,
grpc_closure* /* then_schedule_closure */) {
ServiceConfigChannelArgCallData* calld =
static_cast<ServiceConfigChannelArgCallData*>(elem->call_data);
calld->~ServiceConfigChannelArgCallData();
}
grpc_error* ServiceConfigChannelArgInitChannelElem(
grpc_channel_element* elem, grpc_channel_element_args* args) {
ServiceConfigChannelArgChannelData* chand =
static_cast<ServiceConfigChannelArgChannelData*>(elem->channel_data);
new (chand) ServiceConfigChannelArgChannelData(args);
return GRPC_ERROR_NONE;
}
void ServiceConfigChannelArgDestroyChannelElem(grpc_channel_element* elem) {
ServiceConfigChannelArgChannelData* chand =
static_cast<ServiceConfigChannelArgChannelData*>(elem->channel_data);
chand->~ServiceConfigChannelArgChannelData();
}
const grpc_channel_filter ServiceConfigChannelArgFilter = {
grpc_call_next_op,
grpc_channel_next_op,
sizeof(ServiceConfigChannelArgCallData),
ServiceConfigChannelArgInitCallElem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
ServiceConfigChannelArgDestroyCallElem,
sizeof(ServiceConfigChannelArgChannelData),
ServiceConfigChannelArgInitChannelElem,
ServiceConfigChannelArgDestroyChannelElem,
grpc_channel_next_get_info,
"service_config_channel_arg"};
bool maybe_add_service_config_channel_arg_filter(
grpc_channel_stack_builder* builder, void* /* arg */) {
const grpc_channel_args* channel_args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (grpc_channel_args_want_minimal_stack(channel_args) ||
grpc_channel_args_find_string(channel_args, GRPC_ARG_SERVICE_CONFIG) ==
nullptr) {
return true;
}
return grpc_channel_stack_builder_prepend_filter(
builder, &ServiceConfigChannelArgFilter, nullptr, nullptr);
}
} // namespace
} // namespace grpc_core
void grpc_service_config_channel_arg_filter_init(void) {
grpc_channel_init_register_stage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_core::maybe_add_service_config_channel_arg_filter, nullptr);
}
void grpc_service_config_channel_arg_filter_shutdown(void) {}

@ -38,7 +38,8 @@ static optional_filter compress_filter = {
&grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION};
static optional_filter decompress_filter = {
&grpc_message_decompress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION};
&grpc_core::MessageDecompressFilter,
GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION};
static bool is_building_http_like_transport(
grpc_channel_stack_builder* builder) {

@ -18,6 +18,8 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
#include <assert.h>
#include <string.h>
@ -27,7 +29,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
#include "absl/strings/str_format.h"
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/compression/algorithm_metadata.h"
#include "src/core/lib/compression/compression_args.h"
@ -37,14 +40,25 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
namespace grpc_core {
namespace {
class ChannelData {};
class ChannelData {
public:
explicit ChannelData(const grpc_channel_element_args* args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args->channel_args)) {}
int max_recv_size() const { return max_recv_size_; }
private:
int max_recv_size_;
};
class CallData {
public:
explicit CallData(const grpc_call_element_args& args)
: call_combiner_(args.call_combiner) {
CallData(const grpc_call_element_args& args, const ChannelData* chand)
: call_combiner_(args.call_combiner),
max_recv_message_length_(chand->max_recv_size()) {
// Initialize state for recv_initial_metadata_ready callback
GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_,
OnRecvInitialMetadataReady, this,
@ -59,6 +73,13 @@ class CallData {
GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_,
OnRecvTrailingMetadataReady, this,
grpc_schedule_on_exec_ctx);
const MessageSizeParsedConfig* limits =
MessageSizeParsedConfig::GetFromCallContext(args.context);
if (limits != nullptr && limits->limits().max_recv_size >= 0 &&
(limits->limits().max_recv_size < max_recv_message_length_ ||
max_recv_message_length_ < 0)) {
max_recv_message_length_ = limits->limits().max_recv_size;
}
}
~CallData() { grpc_slice_buffer_destroy_internal(&recv_slices_); }
@ -82,7 +103,7 @@ class CallData {
void MaybeResumeOnRecvTrailingMetadataReady();
static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error);
grpc_core::CallCombiner* call_combiner_;
CallCombiner* call_combiner_;
// Overall error for the call
grpc_error* error_ = GRPC_ERROR_NONE;
// Fields for handling recv_initial_metadata_ready callback
@ -91,17 +112,18 @@ class CallData {
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
// Fields for handling recv_message_ready callback
bool seen_recv_message_ready_ = false;
int max_recv_message_length_;
grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE;
grpc_closure on_recv_message_ready_;
grpc_closure* original_recv_message_ready_ = nullptr;
grpc_closure on_recv_message_next_done_;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_ = nullptr;
OrphanablePtr<ByteStream>* recv_message_ = nullptr;
// recv_slices_ holds the slices read from the original recv_message stream.
// It is initialized during construction and reset when a new stream is
// created using it.
grpc_slice_buffer recv_slices_;
std::aligned_storage<sizeof(grpc_core::SliceBufferByteStream),
alignof(grpc_core::SliceBufferByteStream)>::type
std::aligned_storage<sizeof(SliceBufferByteStream),
alignof(SliceBufferByteStream)>::type
recv_replacement_stream_;
// Fields for handling recv_trailing_metadata_ready callback
bool seen_recv_trailing_metadata_ready_ = false;
@ -139,7 +161,7 @@ void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) {
calld->MaybeResumeOnRecvTrailingMetadataReady();
grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
calld->original_recv_initial_metadata_ready_ = nullptr;
grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
}
void CallData::MaybeResumeOnRecvMessageReady() {
@ -170,6 +192,19 @@ void CallData::OnRecvMessageReady(void* arg, grpc_error* error) {
0) {
return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE);
}
if (calld->max_recv_message_length_ >= 0 &&
(*calld->recv_message_)->length() >
static_cast<uint32_t>(calld->max_recv_message_length_)) {
std::string message_string = absl::StrFormat(
"Received message larger than max (%u vs. %d)",
(*calld->recv_message_)->length(), calld->max_recv_message_length_);
GPR_DEBUG_ASSERT(calld->error_ == GRPC_ERROR_NONE);
calld->error_ = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
return calld->ContinueRecvMessageReadyCallback(
GRPC_ERROR_REF(calld->error_));
}
grpc_slice_buffer_destroy_internal(&calld->recv_slices_);
grpc_slice_buffer_init(&calld->recv_slices_);
return calld->ContinueReadingRecvMessage();
@ -241,9 +276,9 @@ void CallData::FinishRecvMessage() {
// Initializing recv_replacement_stream_ with decompressed_slices removes
// all the slices from decompressed_slices leaving it empty.
new (&recv_replacement_stream_)
grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags);
recv_message_->reset(reinterpret_cast<grpc_core::SliceBufferByteStream*>(
&recv_replacement_stream_));
SliceBufferByteStream(&decompressed_slices, recv_flags);
recv_message_->reset(
reinterpret_cast<SliceBufferByteStream*>(&recv_replacement_stream_));
recv_message_ = nullptr;
}
ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_));
@ -254,7 +289,7 @@ void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) {
// The surface will clean up the receiving stream if there is an error.
grpc_closure* closure = original_recv_message_ready_;
original_recv_message_ready_ = nullptr;
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::MaybeResumeOnRecvTrailingMetadataReady() {
@ -283,7 +318,7 @@ void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) {
calld->error_ = GRPC_ERROR_NONE;
grpc_closure* closure = calld->original_recv_trailing_metadata_ready_;
calld->original_recv_trailing_metadata_ready_ = nullptr;
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::DecompressStartTransportStreamOpBatch(
@ -322,37 +357,44 @@ void DecompressStartTransportStreamOpBatch(
calld->DecompressStartTransportStreamOpBatch(elem, batch);
}
static grpc_error* DecompressInitCallElem(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(*args);
grpc_error* DecompressInitCallElem(grpc_call_element* elem,
const grpc_call_element_args* args) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
new (elem->call_data) CallData(*args, chand);
return GRPC_ERROR_NONE;
}
static void DecompressDestroyCallElem(
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
void DecompressDestroyCallElem(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
static grpc_error* DecompressInitChannelElem(
grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) {
grpc_error* DecompressInitChannelElem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
new (chand) ChannelData(args);
return GRPC_ERROR_NONE;
}
void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) {}
void DecompressDestroyChannelElem(grpc_channel_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
} // namespace
const grpc_channel_filter grpc_message_decompress_filter = {
const grpc_channel_filter MessageDecompressFilter = {
DecompressStartTransportStreamOpBatch,
grpc_channel_next_op,
sizeof(CallData),
DecompressInitCallElem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
DecompressDestroyCallElem,
0, // sizeof(ChannelData)
sizeof(ChannelData),
DecompressInitChannelElem,
DecompressDestroyChannelElem,
grpc_channel_next_get_info,
"message_decompress"};
} // namespace grpc_core

@ -23,7 +23,9 @@
#include "src/core/lib/channel/channel_stack.h"
extern const grpc_channel_filter grpc_message_decompress_filter;
namespace grpc_core {
extern const grpc_channel_filter MessageDecompressFilter;
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \
*/

@ -45,6 +45,25 @@ namespace {
size_t g_message_size_parser_index;
} // namespace
//
// MessageSizeParsedConfig
//
const MessageSizeParsedConfig* MessageSizeParsedConfig::GetFromCallContext(
const grpc_call_context_element* context) {
if (context == nullptr) return nullptr;
auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
if (svc_cfg_call_data == nullptr) return nullptr;
return static_cast<const MessageSizeParsedConfig*>(
svc_cfg_call_data->GetMethodParsedConfig(
MessageSizeParser::ParserIndex()));
}
//
// MessageSizeParser
//
std::unique_ptr<ServiceConfigParser::ParsedConfig>
MessageSizeParser::ParsePerMethodParams(const Json& json, grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
@ -97,12 +116,26 @@ void MessageSizeParser::Register() {
}
size_t MessageSizeParser::ParserIndex() { return g_message_size_parser_index; }
int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args) {
if (grpc_channel_args_want_minimal_stack(args)) return -1;
return grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
{GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX});
}
int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args) {
if (grpc_channel_args_want_minimal_stack(args)) return -1;
return grpc_channel_args_find_integer(
args, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH,
{GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX});
}
} // namespace grpc_core
namespace {
struct channel_data {
grpc_core::MessageSizeParsedConfig::message_size_limits limits;
grpc_core::RefCountedPtr<grpc_core::ServiceConfig> svc_cfg;
};
struct call_data {
@ -118,24 +151,8 @@ struct call_data {
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
// size to the receive limit.
const grpc_core::MessageSizeParsedConfig* limits = nullptr;
grpc_core::ServiceConfigCallData* svc_cfg_call_data = nullptr;
if (args.context != nullptr) {
svc_cfg_call_data = static_cast<grpc_core::ServiceConfigCallData*>(
args.context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
}
if (svc_cfg_call_data != nullptr) {
limits = static_cast<const grpc_core::MessageSizeParsedConfig*>(
svc_cfg_call_data->GetMethodParsedConfig(
grpc_core::MessageSizeParser::ParserIndex()));
} else if (chand.svc_cfg != nullptr) {
const auto* objs_vector =
chand.svc_cfg->GetMethodParsedConfigVector(args.path);
if (objs_vector != nullptr) {
limits = static_cast<const grpc_core::MessageSizeParsedConfig*>(
(*objs_vector)[grpc_core::MessageSizeParser::ParserIndex()].get());
}
}
const grpc_core::MessageSizeParsedConfig* limits =
grpc_core::MessageSizeParsedConfig::GetFromCallContext(args.context);
if (limits != nullptr) {
if (limits->limits().max_send_size >= 0 &&
(limits->limits().max_send_size < this->limits.max_send_size ||
@ -288,35 +305,11 @@ static void message_size_destroy_call_elem(
calld->~call_data();
}
static int default_size(const grpc_channel_args* args,
int without_minimal_stack) {
if (grpc_channel_args_want_minimal_stack(args)) {
return -1;
}
return without_minimal_stack;
}
grpc_core::MessageSizeParsedConfig::message_size_limits get_message_size_limits(
const grpc_channel_args* channel_args) {
grpc_core::MessageSizeParsedConfig::message_size_limits lim;
lim.max_send_size =
default_size(channel_args, GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH);
lim.max_recv_size =
default_size(channel_args, GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH);
for (size_t i = 0; i < channel_args->num_args; ++i) {
if (strcmp(channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) ==
0) {
const grpc_integer_options options = {lim.max_send_size, -1, INT_MAX};
lim.max_send_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
}
if (strcmp(channel_args->args[i].key,
GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
const grpc_integer_options options = {lim.max_recv_size, -1, INT_MAX};
lim.max_recv_size =
grpc_channel_arg_get_integer(&channel_args->args[i], options);
}
}
lim.max_send_size = grpc_core::GetMaxSendSizeFromChannelArgs(channel_args);
lim.max_recv_size = grpc_core::GetMaxRecvSizeFromChannelArgs(channel_args);
return lim;
}
@ -327,26 +320,6 @@ static grpc_error* message_size_init_channel_elem(
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
new (chand) channel_data();
chand->limits = get_message_size_limits(args->channel_args);
// TODO(yashykt): We only need to read GRPC_ARG_SERVICE_CONFIG in the case of
// direct channels. (Service config is otherwise stored in the call_context by
// client_channel filter.) If we ever need a second filter that also needs to
// parse GRPC_ARG_SERVICE_CONFIG, we should refactor this code and add a
// separate filter that reads GRPC_ARG_SERVICE_CONFIG and saves the parsed
// config in the call_context.
const grpc_arg* channel_arg =
grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
const char* service_config_str = grpc_channel_arg_get_string(channel_arg);
if (service_config_str != nullptr) {
grpc_error* service_config_error = GRPC_ERROR_NONE;
auto svc_cfg = grpc_core::ServiceConfig::Create(service_config_str,
&service_config_error);
if (service_config_error == GRPC_ERROR_NONE) {
chand->svc_cfg = std::move(svc_cfg);
} else {
gpr_log(GPR_ERROR, "%s", grpc_error_string(service_config_error));
}
GRPC_ERROR_UNREF(service_config_error);
}
return GRPC_ERROR_NONE;
}
@ -387,6 +360,9 @@ static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder,
void* /*arg*/) {
const grpc_channel_args* channel_args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (grpc_channel_args_want_minimal_stack(channel_args)) {
return true;
}
bool enable = false;
grpc_core::MessageSizeParsedConfig::message_size_limits lim =
get_message_size_limits(channel_args);

@ -40,6 +40,9 @@ class MessageSizeParsedConfig : public ServiceConfigParser::ParsedConfig {
const message_size_limits& limits() const { return limits_; }
static const MessageSizeParsedConfig* GetFromCallContext(
const grpc_call_context_element* context);
private:
message_size_limits limits_;
};
@ -54,6 +57,9 @@ class MessageSizeParser : public ServiceConfigParser::Parser {
static size_t ParserIndex();
};
int GetMaxRecvSizeFromChannelArgs(const grpc_channel_args* args);
int GetMaxSendSizeFromChannelArgs(const grpc_channel_args* args);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H */

@ -64,6 +64,8 @@ void grpc_max_age_filter_init(void);
void grpc_max_age_filter_shutdown(void);
void grpc_message_size_filter_init(void);
void grpc_message_size_filter_shutdown(void);
void grpc_service_config_channel_arg_filter_init(void);
void grpc_service_config_channel_arg_filter_shutdown(void);
void grpc_client_authority_filter_init(void);
void grpc_client_authority_filter_shutdown(void);
void grpc_workaround_cronet_compression_filter_init(void);
@ -114,6 +116,8 @@ void grpc_register_built_in_plugins(void) {
grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init,
grpc_message_size_filter_shutdown);
grpc_register_plugin(grpc_service_config_channel_arg_filter_init,
grpc_service_config_channel_arg_filter_shutdown);
grpc_register_plugin(grpc_client_authority_filter_init,
grpc_client_authority_filter_shutdown);
grpc_register_plugin(grpc_workaround_cronet_compression_filter_init,

@ -64,6 +64,8 @@ void grpc_max_age_filter_init(void);
void grpc_max_age_filter_shutdown(void);
void grpc_message_size_filter_init(void);
void grpc_message_size_filter_shutdown(void);
void grpc_service_config_channel_arg_filter_init(void);
void grpc_service_config_channel_arg_filter_shutdown(void);
void grpc_client_authority_filter_init(void);
void grpc_client_authority_filter_shutdown(void);
void grpc_workaround_cronet_compression_filter_init(void);
@ -114,6 +116,8 @@ void grpc_register_built_in_plugins(void) {
grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init,
grpc_message_size_filter_shutdown);
grpc_register_plugin(grpc_service_config_channel_arg_filter_init,
grpc_service_config_channel_arg_filter_shutdown);
grpc_register_plugin(grpc_client_authority_filter_init,
grpc_client_authority_filter_shutdown);
grpc_register_plugin(grpc_workaround_cronet_compression_filter_init,

@ -71,6 +71,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/server_address.cc',
'src/core/ext/filters/client_channel/service_config.cc',
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
'src/core/ext/filters/client_channel/service_config_parser.cc',
'src/core/ext/filters/client_channel/subchannel.cc',
'src/core/ext/filters/client_channel/subchannel_pool_interface.cc',

@ -29,6 +29,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/metadata.h"
#include "test/core/end2end/cq_verifier.h"
@ -466,6 +467,328 @@ static void test_max_message_length_on_response(grpc_end2end_test_config config,
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(recv_payload);
grpc_call_unref(c);
if (s != nullptr) grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
static grpc_metadata gzip_compression_override() {
grpc_metadata gzip_compression_override;
gzip_compression_override.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
gzip_compression_override.value = grpc_slice_from_static_string("gzip");
memset(&gzip_compression_override.internal_data, 0,
sizeof(gzip_compression_override.internal_data));
return gzip_compression_override;
}
// Test receive message limit with compressed request larger than the limit
static void test_max_receive_message_length_on_compressed_request(
grpc_end2end_test_config config, bool minimal_stack) {
gpr_log(GPR_INFO,
"test max receive message length on compressed request with "
"minimal_stack=%d",
minimal_stack);
grpc_end2end_test_fixture f;
grpc_call* c = nullptr;
grpc_call* s = nullptr;
cq_verifier* cqv;
grpc_op ops[6];
grpc_op* op;
grpc_slice request_payload_slice = grpc_slice_malloc(1024);
memset(GRPC_SLICE_START_PTR(request_payload_slice), 'a', 1024);
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer* recv_payload = nullptr;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details, status_details;
int was_cancelled = 2;
// Set limit via channel args.
grpc_arg arg[2];
arg[0] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5);
arg[1] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MINIMAL_STACK), minimal_stack);
grpc_channel_args* server_args =
grpc_channel_args_copy_and_add(nullptr, arg, 2);
f = begin_test(config, "test_max_request_message_length", nullptr,
server_args);
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(server_args);
}
cqv = cq_verifier_create(f.cq);
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, gpr_inf_future(GPR_CLOCK_REALTIME),
nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
grpc_metadata compression_md = gzip_compression_override();
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 1;
op->data.send_initial_metadata.metadata = &compression_md;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
if (minimal_stack) {
/* Expect the RPC to proceed normally for a minimal stack */
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
}
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
if (minimal_stack) {
/* We do not perform message size checks for minimal stack. */
GPR_ASSERT(status == GRPC_STATUS_OK);
} else {
GPR_ASSERT(was_cancelled == 1);
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(grpc_slice_str_cmp(
details, "Received message larger than max (29 vs. 5)") ==
0);
}
grpc_slice_unref(details);
grpc_slice_unref(request_payload_slice);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(recv_payload);
grpc_call_unref(c);
if (s != nullptr) grpc_call_unref(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
// Test receive message limit with compressed response larger than the limit.
static void test_max_receive_message_length_on_compressed_response(
grpc_end2end_test_config config, bool minimal_stack) {
gpr_log(GPR_INFO,
"testing max receive message length on compressed response with "
"minimal_stack=%d",
minimal_stack);
grpc_end2end_test_fixture f;
grpc_call* c = nullptr;
grpc_call* s = nullptr;
cq_verifier* cqv;
grpc_op ops[6];
grpc_op* op;
grpc_slice response_payload_slice = grpc_slice_malloc(1024);
memset(GRPC_SLICE_START_PTR(response_payload_slice), 'a', 1024);
grpc_byte_buffer* response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer* recv_payload = nullptr;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
// Set limit via channel args.
grpc_arg arg[2];
arg[0] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5);
arg[1] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MINIMAL_STACK), minimal_stack);
grpc_channel_args* client_args =
grpc_channel_args_copy_and_add(nullptr, arg, 2);
f = begin_test(config, "test_max_response_message_length", client_args,
nullptr);
{
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(client_args);
}
cqv = cq_verifier_create(f.cq);
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/service/method"),
nullptr, gpr_inf_future(GPR_CLOCK_REALTIME),
nullptr);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
grpc_metadata compression_md = gzip_compression_override();
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 1;
op->data.send_initial_metadata.metadata = &compression_md;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
grpc_slice status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
if (minimal_stack) {
/* We do not perform message size checks for minimal stack. */
GPR_ASSERT(status == GRPC_STATUS_OK);
} else {
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(grpc_slice_str_cmp(
details, "Received message larger than max (29 vs. 5)") ==
0);
}
grpc_slice_unref(details);
grpc_slice_unref(response_payload_slice);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(recv_payload);
grpc_call_unref(c);
if (s != nullptr) grpc_call_unref(s);
@ -500,6 +823,15 @@ void max_message_length(grpc_end2end_test_config config) {
test_max_message_length_on_response(config, false /* send_limit */,
true /* use_service_config */,
true /* use_string_json_value */);
/* The following tests are not useful for inproc transport and do not work
* with our simple proxy. */
if (strcmp(config.name, "inproc") != 0 &&
(config.feature_mask & FEATURE_MASK_SUPPORTS_REQUEST_PROXYING) == 0) {
test_max_receive_message_length_on_compressed_request(config, false);
test_max_receive_message_length_on_compressed_request(config, true);
test_max_receive_message_length_on_compressed_response(config, false);
test_max_receive_message_length_on_compressed_response(config, true);
}
}
void max_message_length_pre_init(void) {}

@ -529,9 +529,10 @@ static void BM_IsolatedFilter(benchmark::State& state) {
grpc_call_final_info final_info;
TestOp test_op_data;
const int kArenaSize = 4096;
grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
grpc_call_element_args call_args{call_stack,
nullptr,
nullptr,
context,
method,
start_time,
deadline,

@ -1164,6 +1164,7 @@ src/core/ext/filters/client_channel/server_address.h \
src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config.h \
src/core/ext/filters/client_channel/service_config_call_data.h \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/service_config_parser.h \
src/core/ext/filters/client_channel/subchannel.cc \

@ -965,6 +965,7 @@ src/core/ext/filters/client_channel/server_address.h \
src/core/ext/filters/client_channel/service_config.cc \
src/core/ext/filters/client_channel/service_config.h \
src/core/ext/filters/client_channel/service_config_call_data.h \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
src/core/ext/filters/client_channel/service_config_parser.cc \
src/core/ext/filters/client_channel/service_config_parser.h \
src/core/ext/filters/client_channel/subchannel.cc \

Loading…
Cancel
Save