Reland: [promises] Compression filter conversion (#31686)

* Revert "Revert "[promises] Compression filter conversion (#31204)" (#31682)"

This reverts commit fa31b36cb1.

* fix?

* fix
pull/31688/head^2
Craig Tiller 2 years ago committed by GitHub
parent cf666c4c20
commit c545350633
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      BUILD
  2. 76
      CMakeLists.txt
  3. 6
      Makefile
  4. 37
      build_autogenerated.yaml
  5. 3
      config.m4
  6. 3
      config.w32
  7. 6
      gRPC-C++.podspec
  8. 11
      gRPC-Core.podspec
  9. 6
      grpc.gemspec
  10. 7
      grpc.gyp
  11. 6
      package.xml
  12. 50
      src/core/ext/filters/http/http_filters_plugin.cc
  13. 315
      src/core/ext/filters/http/message_compress/compression_filter.cc
  14. 133
      src/core/ext/filters/http/message_compress/compression_filter.h
  15. 332
      src/core/ext/filters/http/message_compress/message_compress_filter.cc
  16. 52
      src/core/ext/filters/http/message_compress/message_compress_filter.h
  17. 322
      src/core/ext/filters/http/message_compress/message_decompress_filter.cc
  18. 32
      src/core/ext/filters/http/message_compress/message_decompress_filter.h
  19. 3
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  20. 9
      src/core/lib/channel/connected_channel.cc
  21. 99
      src/core/lib/channel/promise_based_filter.cc
  22. 6
      src/core/lib/channel/promise_based_filter.h
  23. 38
      src/core/lib/compression/compression_internal.cc
  24. 6
      src/core/lib/compression/compression_internal.h
  25. 12
      src/core/lib/gprpp/bitset.h
  26. 5
      src/core/lib/surface/server.cc
  27. 5
      src/core/lib/transport/metadata_batch.cc
  28. 3
      src/python/grpcio/grpc_core_dependencies.py
  29. 3
      test/core/channel/BUILD
  30. 186
      test/core/channel/minimal_stack_is_minimal_test.cc
  31. 11
      test/core/compression/BUILD
  32. 103
      test/core/compression/args_utils.cc
  33. 34
      test/core/compression/args_utils.h
  34. 69
      test/core/compression/compression_test.cc
  35. 38
      test/core/end2end/fixtures/h2_compress.cc
  36. 18
      test/core/end2end/fuzzers/client_fuzzer.cc
  37. 2
      test/core/end2end/generate_tests.bzl
  38. 79
      test/core/end2end/tests/compressed_payload.cc
  39. 82
      test/core/end2end/tests/max_message_length.cc
  40. 2
      test/core/end2end/tests/streaming_error_response.cc
  41. 16
      test/cpp/microbenchmarks/bm_call_create.cc
  42. 6
      tools/doxygen/Doxyfile.c++.internal
  43. 6
      tools/doxygen/Doxyfile.core.internal
  44. 22
      tools/profiling/microbenchmarks/bm_diff/bm_diff.py
  45. 48
      tools/run_tests/generated/tests.json

12
BUILD

@ -3114,14 +3114,12 @@ grpc_cc_library(
srcs = [
"//src/core:ext/filters/http/client/http_client_filter.cc",
"//src/core:ext/filters/http/http_filters_plugin.cc",
"//src/core:ext/filters/http/message_compress/message_compress_filter.cc",
"//src/core:ext/filters/http/message_compress/message_decompress_filter.cc",
"//src/core:ext/filters/http/message_compress/compression_filter.cc",
"//src/core:ext/filters/http/server/http_server_filter.cc",
],
hdrs = [
"//src/core:ext/filters/http/client/http_client_filter.h",
"//src/core:ext/filters/http/message_compress/message_compress_filter.h",
"//src/core:ext/filters/http/message_compress/message_decompress_filter.h",
"//src/core:ext/filters/http/message_compress/compression_filter.h",
"//src/core:ext/filters/http/server/http_server_filter.h",
],
external_deps = [
@ -3138,7 +3136,6 @@ grpc_cc_library(
deps = [
"channel_stack_builder",
"config",
"debug_location",
"gpr",
"grpc_base",
"grpc_public_hdrs",
@ -3151,16 +3148,17 @@ grpc_cc_library(
"//src/core:channel_init",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:error",
"//src/core:for_each",
"//src/core:grpc_message_size_filter",
"//src/core:latch",
"//src/core:map_pipe",
"//src/core:percent_encoding",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:status_helper",
"//src/core:transport_fwd",
"//src/core:try_concurrently",
"//src/core:try_seq",
],
)

76
CMakeLists.txt generated

@ -774,7 +774,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_c goaway_server_test)
add_dependencies(buildtests_c inproc_callback_test)
add_dependencies(buildtests_c invalid_call_argument_test)
add_dependencies(buildtests_c minimal_stack_is_minimal_test)
add_dependencies(buildtests_c multiple_server_queues_test)
add_dependencies(buildtests_c no_server_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS)
@ -1033,6 +1032,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx message_compress_test)
add_dependencies(buildtests_cxx message_size_service_config_test)
add_dependencies(buildtests_cxx metadata_map_test)
add_dependencies(buildtests_cxx minimal_stack_is_minimal_test)
add_dependencies(buildtests_cxx miscompile_with_no_unique_address_test)
add_dependencies(buildtests_cxx mock_stream_test)
add_dependencies(buildtests_cxx mock_test)
@ -1354,7 +1354,6 @@ endif()
if(gRPC_BUILD_TESTS)
add_library(end2end_tests
test/core/compression/args_utils.cc
test/core/end2end/cq_verifier.cc
test/core/end2end/data/client_certs.cc
test/core/end2end/data/server1_cert.cc
@ -1722,8 +1721,7 @@ add_library(grpc
src/core/ext/filters/http/client/http_client_filter.cc
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
src/core/ext/filters/http/message_compress/message_decompress_filter.cc
src/core/ext/filters/http/message_compress/compression_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/rbac/rbac_filter.cc
@ -2678,8 +2676,7 @@ add_library(grpc_unsecure
src/core/ext/filters/http/client/http_client_filter.cc
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/message_compress_filter.cc
src/core/ext/filters/http/message_compress/message_decompress_filter.cc
src/core/ext/filters/http/message_compress/compression_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/transport/chttp2/client/chttp2_connector.cc
@ -5094,35 +5091,6 @@ target_link_libraries(invalid_call_argument_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(minimal_stack_is_minimal_test
test/core/channel/minimal_stack_is_minimal_test.cc
)
target_include_directories(minimal_stack_is_minimal_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(minimal_stack_is_minimal_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -8882,7 +8850,6 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(compression_test
test/core/compression/args_utils.cc
test/core/compression/compression_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -14762,6 +14729,43 @@ target_link_libraries(metadata_map_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(minimal_stack_is_minimal_test
test/core/channel/minimal_stack_is_minimal_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(minimal_stack_is_minimal_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(minimal_stack_is_minimal_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

6
Makefile generated

@ -1023,8 +1023,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/http/client/http_client_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/rbac/rbac_filter.cc \
@ -1838,8 +1837,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/http/client/http_client_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/transport/chttp2/client/chttp2_connector.cc \

@ -17,7 +17,6 @@ libs:
language: c
public_headers: []
headers:
- test/core/compression/args_utils.h
- test/core/end2end/cq_verifier.h
- test/core/end2end/data/ssl_test_data.h
- test/core/end2end/end2end_tests.h
@ -27,7 +26,6 @@ libs:
- test/core/end2end/tests/cancel_test_helpers.h
- test/core/util/test_lb_policies.h
src:
- test/core/compression/args_utils.cc
- test/core/end2end/cq_verifier.cc
- test/core/end2end/data/client_certs.cc
- test/core/end2end/data/server1_cert.cc
@ -366,8 +364,7 @@ libs:
- src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/message_compress_filter.h
- src/core/ext/filters/http/message_compress/message_decompress_filter.h
- src/core/ext/filters/http/message_compress/compression_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
- src/core/ext/filters/rbac/rbac_filter.h
@ -1111,8 +1108,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.cc
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/message_compress_filter.cc
- src/core/ext/filters/http/message_compress/message_decompress_filter.cc
- src/core/ext/filters/http/message_compress/compression_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
- src/core/ext/filters/rbac/rbac_filter.cc
@ -1938,8 +1934,7 @@ libs:
- src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/message_compress_filter.h
- src/core/ext/filters/http/message_compress/message_decompress_filter.h
- src/core/ext/filters/http/message_compress/compression_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
- src/core/ext/transport/chttp2/client/chttp2_connector.h
@ -2324,8 +2319,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.cc
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/message_compress_filter.cc
- src/core/ext/filters/http/message_compress/message_decompress_filter.cc
- src/core/ext/filters/http/message_compress/compression_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
- src/core/ext/transport/chttp2/client/chttp2_connector.cc
@ -4152,15 +4146,6 @@ targets:
- test/core/end2end/invalid_call_argument_test.cc
deps:
- grpc_test_util
- name: minimal_stack_is_minimal_test
build: test
language: c
headers: []
src:
- test/core/channel/minimal_stack_is_minimal_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: multiple_server_queues_test
build: test
language: c
@ -5921,10 +5906,8 @@ targets:
gtest: true
build: test
language: c++
headers:
- test/core/compression/args_utils.h
headers: []
src:
- test/core/compression/args_utils.cc
- test/core/compression/compression_test.cc
deps:
- grpc_test_util
@ -9095,6 +9078,16 @@ targets:
- test/core/util/tracer_util.cc
deps:
- grpc_test_util
- name: minimal_stack_is_minimal_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/channel/minimal_stack_is_minimal_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: miscompile_with_no_unique_address_test
gtest: true
build: test

3
config.m4 generated

@ -105,8 +105,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/http/client/http_client_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/rbac/rbac_filter.cc \

3
config.w32 generated

@ -71,8 +71,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\http\\client\\http_client_filter.cc " +
"src\\core\\ext\\filters\\http\\client_authority_filter.cc " +
"src\\core\\ext\\filters\\http\\http_filters_plugin.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\message_compress_filter.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\message_decompress_filter.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\compression_filter.cc " +
"src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " +
"src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +
"src\\core\\ext\\filters\\rbac\\rbac_filter.cc " +

6
gRPC-C++.podspec generated

@ -275,8 +275,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h',
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',
@ -1180,8 +1179,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h',
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',

11
gRPC-Core.podspec generated

@ -311,10 +311,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.cc',
@ -1841,8 +1839,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h',
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/message_compress_filter.h',
'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',
@ -2641,8 +2638,6 @@ Pod::Spec.new do |s|
'src/core/lib/security/authorization/grpc_authorization_policy_provider.h',
'src/core/lib/security/authorization/rbac_translator.cc',
'src/core/lib/security/authorization/rbac_translator.h',
'test/core/compression/args_utils.cc',
'test/core/compression/args_utils.h',
'test/core/end2end/cq_verifier.cc',
'test/core/end2end/cq_verifier.h',
'test/core/end2end/data/client_certs.cc',

6
grpc.gemspec generated

@ -222,10 +222,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/http/client_authority_filter.cc )
s.files += %w( src/core/ext/filters/http/client_authority_filter.h )
s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc )
s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.h )
s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.h )
s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.h )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.h )
s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc )

7
grpc.gyp generated

@ -181,7 +181,6 @@
'grpc_test_util',
],
'sources': [
'test/core/compression/args_utils.cc',
'test/core/end2end/cq_verifier.cc',
'test/core/end2end/data/client_certs.cc',
'test/core/end2end/data/server1_cert.cc',
@ -436,8 +435,7 @@
'src/core/ext/filters/http/client/http_client_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/rbac/rbac_filter.cc',
@ -1198,8 +1196,7 @@
'src/core/ext/filters/http/client/http_client_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/transport/chttp2/client/chttp2_connector.cc',

6
package.xml generated

@ -204,10 +204,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/http/client_authority_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/client_authority_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/http_filters_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_compress_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_compress_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_decompress_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_decompress_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/compression_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/compression_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.cc" role="src" />

@ -25,8 +25,7 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
#include "src/core/ext/filters/http/message_compress/compression_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
@ -45,25 +44,24 @@ static bool is_building_http_like_transport(
namespace grpc_core {
void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
auto optional = [builder](grpc_channel_stack_type channel_type,
bool enable_in_minimal_stack,
const char* control_channel_arg,
const grpc_channel_filter* filter) {
auto compression = [builder](grpc_channel_stack_type channel_type,
const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage(
channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[enable_in_minimal_stack, control_channel_arg,
filter](ChannelStackBuilder* builder) {
[filter](ChannelStackBuilder* builder) {
if (!is_building_http_like_transport(builder)) return true;
auto args = builder->channel_args();
const bool enable = args.GetBool(control_channel_arg)
.value_or(enable_in_minimal_stack ||
!args.WantMinimalStack());
const bool enable =
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION)
.value_or(true) ||
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION)
.value_or(true);
if (enable) builder->PrependFilter(filter);
return true;
});
};
auto required = [builder](grpc_channel_stack_type channel_type,
const grpc_channel_filter* filter) {
auto http = [builder](grpc_channel_stack_type channel_type,
const grpc_channel_filter* filter) {
builder->channel_init()->RegisterStage(
channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[filter](ChannelStackBuilder* builder) {
@ -73,25 +71,11 @@ void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
return true;
});
};
// TODO(ctiller): return this flag to true once the promise conversion is
// complete.
static constexpr bool kMinimalStackHasDecompression = false;
optional(GRPC_CLIENT_SUBCHANNEL, false,
GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION,
&grpc_message_compress_filter);
optional(GRPC_CLIENT_DIRECT_CHANNEL, false,
GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION,
&grpc_message_compress_filter);
optional(GRPC_SERVER_CHANNEL, false, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION,
&grpc_message_compress_filter);
optional(GRPC_CLIENT_SUBCHANNEL, kMinimalStackHasDecompression,
GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter);
optional(GRPC_CLIENT_DIRECT_CHANNEL, kMinimalStackHasDecompression,
GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter);
optional(GRPC_SERVER_CHANNEL, kMinimalStackHasDecompression,
GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter);
required(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter);
required(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter);
required(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter);
compression(GRPC_CLIENT_SUBCHANNEL, &ClientCompressionFilter::kFilter);
compression(GRPC_CLIENT_DIRECT_CHANNEL, &ClientCompressionFilter::kFilter);
compression(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter);
http(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter);
http(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter);
http(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter);
}
} // namespace grpc_core

@ -0,0 +1,315 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/http/message_compress/compression_filter.h"
#include <inttypes.h>
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/map_pipe.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_concurrently.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
const grpc_channel_filter ClientCompressionFilter::kFilter =
MakePromiseBasedFilter<ClientCompressionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata |
kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages>("compression");
const grpc_channel_filter ServerCompressionFilter::kFilter =
MakePromiseBasedFilter<ServerCompressionFilter, FilterEndpoint::kServer,
kFilterExaminesServerInitialMetadata |
kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages>("compression");
absl::StatusOr<ClientCompressionFilter> ClientCompressionFilter::Create(
const ChannelArgs& args, ChannelFilter::Args) {
return ClientCompressionFilter(args);
}
absl::StatusOr<ServerCompressionFilter> ServerCompressionFilter::Create(
const ChannelArgs& args, ChannelFilter::Args) {
return ServerCompressionFilter(args);
}
CompressionFilter::CompressionFilter(const ChannelArgs& args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)),
message_size_service_config_parser_index_(
MessageSizeParser::ParserIndex()),
default_compression_algorithm_(
DefaultCompressionAlgorithmFromChannelArgs(args).value_or(
GRPC_COMPRESS_NONE)),
enabled_compression_algorithms_(
CompressionAlgorithmSet::FromChannelArgs(args)),
enable_compression_(
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION).value_or(true)),
enable_decompression_(
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION)
.value_or(true)) {
// Make sure the default is enabled.
if (!enabled_compression_algorithms_.IsSet(default_compression_algorithm_)) {
const char* name;
if (!grpc_compression_algorithm_name(default_compression_algorithm_,
&name)) {
name = "<unknown>";
}
gpr_log(GPR_ERROR,
"default compression algorithm %s not enabled: switching to none",
name);
default_compression_algorithm_ = GRPC_COMPRESS_NONE;
}
}
MessageHandle CompressionFilter::CompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_ERROR, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d",
message->payload()->Length(), algorithm, message->flags());
}
// Check if we're allowed to compress this message
// (apps might want to disable compression for certain messages to avoid
// crime/beast like vulns).
uint32_t& flags = message->mutable_flags();
if (algorithm == GRPC_COMPRESS_NONE || !enable_compression_ ||
(flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS))) {
return message;
}
// Try to compress the payload.
SliceBuffer tmp;
SliceBuffer* payload = message->payload();
bool did_compress = grpc_msg_compress(algorithm, payload->c_slice_buffer(),
tmp.c_slice_buffer());
// If we achieved compression send it as compressed, otherwise send it as (to
// avoid spending cycles on the receiver decompressing).
if (did_compress) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
const size_t before_size = payload->Length();
const size_t after_size = tmp.Length();
const float savings_ratio = 1.0f - static_cast<float>(after_size) /
static_cast<float>(before_size);
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name));
gpr_log(GPR_INFO,
"Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
" bytes (%.2f%% savings)",
algo_name, before_size, after_size, 100 * savings_ratio);
}
tmp.Swap(payload);
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name));
gpr_log(GPR_INFO,
"Algorithm '%s' enabled but decided not to compress. Input size: "
"%" PRIuPTR,
algo_name, payload->Length());
}
}
return message;
}
absl::StatusOr<MessageHandle> CompressionFilter::DecompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm,
absl::optional<uint32_t> max_recv_message_length) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_ERROR, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d",
message->payload()->Length(), max_recv_message_length.value_or(-1),
algorithm);
}
// Check max message length.
if (max_recv_message_length.has_value() &&
message->payload()->Length() >
static_cast<size_t>(*max_recv_message_length)) {
return absl::ResourceExhaustedError(absl::StrFormat(
"Received message larger than max (%u vs. %d)",
message->payload()->Length(), *max_recv_message_length));
}
// Check if decompression is enabled (if not, we can just pass the message
// up).
if (!enable_decompression_ ||
(message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == 0) {
return std::move(message);
}
// Try to decompress the payload.
SliceBuffer decompressed_slices;
if (grpc_msg_decompress(algorithm, message->payload()->c_slice_buffer(),
decompressed_slices.c_slice_buffer()) == 0) {
return absl::InternalError(
absl::StrCat("Unexpected error decompressing data for algorithm ",
CompressionAlgorithmAsString(algorithm)));
}
// Swap the decompressed slices into the message.
message->payload()->Swap(&decompressed_slices);
message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS;
message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
return std::move(message);
}
class CompressionFilter::DecompressLoop {
public:
explicit DecompressLoop(CompressionFilter* filter, CallArgs& call_args)
: filter_(filter),
mapper_(PipeMapper<MessageHandle>::Intercept(
*call_args.incoming_messages)) {}
// Once we have a compression algorithm we can construct the decompression
// loop.
// Returns a promise that resolves to MessageHandle.
auto TakeAndRun(grpc_compression_algorithm algorithm) {
// Configure max receive size.
auto max_recv_message_length = filter_->max_recv_size_;
const MessageSizeParsedConfig* limits =
MessageSizeParsedConfig::GetFromCallContext(
GetContext<grpc_call_context_element>(),
filter_->message_size_service_config_parser_index_);
if (limits != nullptr && limits->max_recv_size().has_value() &&
(!max_recv_message_length.has_value() ||
*limits->max_recv_size() < *max_recv_message_length)) {
max_recv_message_length = *limits->max_recv_size();
}
// Interject decompression into the message loop.
return mapper_.TakeAndRun([algorithm, max_recv_message_length,
filter = filter_](MessageHandle message) {
return filter->DecompressMessage(std::move(message), algorithm,
max_recv_message_length);
});
}
private:
CompressionFilter* filter_;
PipeMapper<MessageHandle> mapper_;
};
class CompressionFilter::CompressLoop {
public:
explicit CompressLoop(CompressionFilter* filter, CallArgs& call_args)
: filter_(filter),
mapper_(PipeMapper<MessageHandle>::Intercept(
*call_args.outgoing_messages)) {}
// Once we're ready to send initial metadata we can construct the compression
// loop.
// Returns a promise that resolves to MessageHandle.
auto TakeAndRun(grpc_metadata_batch& outgoing_metadata) {
const auto algorithm =
outgoing_metadata.Take(GrpcInternalEncodingRequest())
.value_or(filter_->default_compression_algorithm());
// Convey supported compression algorithms.
outgoing_metadata.Set(GrpcAcceptEncodingMetadata(),
filter_->enabled_compression_algorithms());
if (algorithm != GRPC_COMPRESS_NONE) {
outgoing_metadata.Set(GrpcEncodingMetadata(), algorithm);
}
// Interject compression into the message loop.
return mapper_.TakeAndRun([filter = filter_, algorithm](MessageHandle m) {
return filter->CompressMessage(std::move(m), algorithm);
});
}
private:
CompressionFilter* filter_;
PipeMapper<MessageHandle> mapper_;
};
ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto compress_loop = CompressLoop(this, call_args)
.TakeAndRun(*call_args.client_initial_metadata);
DecompressLoop decompress_loop(this, call_args);
auto* server_initial_metadata = call_args.server_initial_metadata;
// Concurrently:
// - call the next filter
// - wait for initial metadata from the server and then commence decompression
// - compress outgoing messages
return TryConcurrently(next_promise_factory(std::move(call_args)))
.NecessaryPull(Seq(server_initial_metadata->Wait(),
[decompress_loop = std::move(decompress_loop)](
ServerMetadata** server_initial_metadata) mutable
-> ArenaPromise<absl::Status> {
if (*server_initial_metadata == nullptr) {
return ImmediateOkStatus();
}
return decompress_loop.TakeAndRun(
(*server_initial_metadata)
->get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE));
}))
.Push(std::move(compress_loop));
}
ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
CompressLoop compress_loop(this, call_args);
auto decompress_loop = DecompressLoop(this, call_args)
.TakeAndRun(call_args.client_initial_metadata
->get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE));
auto* read_latch = GetContext<Arena>()->New<Latch<ServerMetadata*>>();
auto* write_latch =
std::exchange(call_args.server_initial_metadata, read_latch);
// Concurrently:
// - call the next filter
// - decompress incoming messages
// - wait for initial metadata to be sent, and then commence compression of
// outgoing messages
return TryConcurrently(next_promise_factory(std::move(call_args)))
.Pull(std::move(decompress_loop))
.Push(Seq(read_latch->Wait(),
[write_latch, compress_loop = std::move(compress_loop)](
ServerMetadata** md) mutable {
// Find the compression algorithm.
auto loop = compress_loop.TakeAndRun(**md);
write_latch->Set(*md);
return loop;
}));
}
} // namespace grpc_core

@ -0,0 +1,133 @@
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_COMPRESSION_FILTER_H
#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_COMPRESSION_FILTER_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/compression_types.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
/** Compression filter for messages.
*
* See <grpc/compression.h> for the available compression settings.
*
* Compression settings may come from:
* - Channel configuration, as established at channel creation time.
* - The metadata accompanying the outgoing data to be compressed. This is
* taken as a request only. We may choose not to honor it. The metadata key
* is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY.
*
* Compression can be disabled for concrete messages (for instance in order to
* prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in
* the MessageHandle flags.
*
* The attempted compression mechanism is added to the resulting initial
* metadata under the 'grpc-encoding' key.
*
* If compression is actually performed, the MessageHandle's flag is modified to
* incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the
* aforementioned 'grpc-encoding' metadata value, data will pass through
* uncompressed. */
class CompressionFilter : public ChannelFilter {
protected:
explicit CompressionFilter(const ChannelArgs& args);
class CompressLoop;
class DecompressLoop;
grpc_compression_algorithm default_compression_algorithm() const {
return default_compression_algorithm_;
}
CompressionAlgorithmSet enabled_compression_algorithms() const {
return enabled_compression_algorithms_;
}
private:
// Compress one message synchronously.
MessageHandle CompressMessage(MessageHandle message,
grpc_compression_algorithm algorithm) const;
// Decompress one message synchronously.
absl::StatusOr<MessageHandle> DecompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm,
absl::optional<uint32_t> max_recv_message_length) const;
// Max receive message length, if set.
absl::optional<uint32_t> max_recv_size_;
size_t message_size_service_config_parser_index_;
// The default, channel-level, compression algorithm.
grpc_compression_algorithm default_compression_algorithm_;
// Enabled compression algorithms.
CompressionAlgorithmSet enabled_compression_algorithms_;
// Is compression enabled?
bool enable_compression_;
// Is decompression enabled?
bool enable_decompression_;
};
class ClientCompressionFilter final : public CompressionFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientCompressionFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using CompressionFilter::CompressionFilter;
};
class ServerCompressionFilter final : public CompressionFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerCompressionFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using CompressionFilter::CompressionFilter;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_COMPRESSION_FILTER_H \
*/

@ -1,332 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
#include <inttypes.h>
#include <stdlib.h>
#include <new>
#include <utility>
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace {
class ChannelData {
public:
explicit ChannelData(grpc_channel_element_args* args) {
// Get the enabled and the default algorithms from channel args.
enabled_compression_algorithms_ =
grpc_core::CompressionAlgorithmSet::FromChannelArgs(args->channel_args);
default_compression_algorithm_ =
grpc_core::DefaultCompressionAlgorithmFromChannelArgs(
args->channel_args)
.value_or(GRPC_COMPRESS_NONE);
// Make sure the default is enabled.
if (!enabled_compression_algorithms_.IsSet(
default_compression_algorithm_)) {
const char* name;
if (!grpc_compression_algorithm_name(default_compression_algorithm_,
&name)) {
name = "<unknown>";
}
gpr_log(GPR_ERROR,
"default compression algorithm %s not enabled: switching to none",
name);
default_compression_algorithm_ = GRPC_COMPRESS_NONE;
}
GPR_ASSERT(!args->is_last);
}
grpc_compression_algorithm default_compression_algorithm() const {
return default_compression_algorithm_;
}
grpc_core::CompressionAlgorithmSet enabled_compression_algorithms() const {
return enabled_compression_algorithms_;
}
private:
/** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm_;
/** Enabled compression algorithms */
grpc_core::CompressionAlgorithmSet enabled_compression_algorithms_;
};
class CallData {
public:
CallData(grpc_call_element* elem, const grpc_call_element_args& args)
: call_combiner_(args.call_combiner) {
ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
// The call's message compression algorithm is set to channel's default
// setting. It can be overridden later by initial metadata.
if (GPR_LIKELY(channeld->enabled_compression_algorithms().IsSet(
channeld->default_compression_algorithm()))) {
compression_algorithm_ = channeld->default_compression_algorithm();
}
GRPC_CLOSURE_INIT(&forward_send_message_batch_in_call_combiner_,
ForwardSendMessageBatch, elem, grpc_schedule_on_exec_ctx);
}
~CallData() {}
void CompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
private:
bool SkipMessageCompression();
void FinishSendMessage(grpc_call_element* elem);
void ProcessSendInitialMetadata(grpc_call_element* elem,
grpc_metadata_batch* initial_metadata);
// Methods for processing a send_message batch
static void FailSendMessageBatchInCallCombiner(void* calld_arg,
grpc_error_handle error);
static void ForwardSendMessageBatch(void* elem_arg, grpc_error_handle unused);
grpc_core::CallCombiner* call_combiner_;
grpc_compression_algorithm compression_algorithm_ = GRPC_COMPRESS_NONE;
grpc_error_handle cancel_error_;
grpc_transport_stream_op_batch* send_message_batch_ = nullptr;
bool seen_initial_metadata_ = false;
grpc_closure forward_send_message_batch_in_call_combiner_;
};
// Returns true if we should skip message compression for the current message.
bool CallData::SkipMessageCompression() {
// If the flags of this message indicate that it shouldn't be compressed, we
// skip message compression.
uint32_t flags = send_message_batch_->payload->send_message.flags;
if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
return true;
}
// If this call doesn't have any message compression algorithm set, skip
// message compression.
return compression_algorithm_ == GRPC_COMPRESS_NONE;
}
void CallData::ProcessSendInitialMetadata(
grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
// Find the compression algorithm.
compression_algorithm_ =
initial_metadata->Take(grpc_core::GrpcInternalEncodingRequest())
.value_or(channeld->default_compression_algorithm());
switch (compression_algorithm_) {
case GRPC_COMPRESS_NONE:
break;
case GRPC_COMPRESS_DEFLATE:
case GRPC_COMPRESS_GZIP:
initial_metadata->Set(grpc_core::GrpcEncodingMetadata(),
compression_algorithm_);
break;
case GRPC_COMPRESS_ALGORITHMS_COUNT:
abort();
}
// Convey supported compression algorithms.
initial_metadata->Set(grpc_core::GrpcAcceptEncodingMetadata(),
channeld->enabled_compression_algorithms());
}
void CallData::FinishSendMessage(grpc_call_element* elem) {
// Compress the data if appropriate.
if (!SkipMessageCompression()) {
grpc_core::SliceBuffer tmp;
uint32_t& send_flags = send_message_batch_->payload->send_message.flags;
grpc_core::SliceBuffer* payload =
send_message_batch_->payload->send_message.send_message;
bool did_compress =
grpc_msg_compress(compression_algorithm_, payload->c_slice_buffer(),
tmp.c_slice_buffer());
if (did_compress) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
const size_t before_size = payload->Length();
const size_t after_size = tmp.Length();
const float savings_ratio = 1.0f - static_cast<float>(after_size) /
static_cast<float>(before_size);
GPR_ASSERT(grpc_compression_algorithm_name(compression_algorithm_,
&algo_name));
gpr_log(GPR_INFO,
"Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
" bytes (%.2f%% savings)",
algo_name, before_size, after_size, 100 * savings_ratio);
}
tmp.Swap(payload);
send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
GPR_ASSERT(grpc_compression_algorithm_name(compression_algorithm_,
&algo_name));
gpr_log(
GPR_INFO,
"Algorithm '%s' enabled but decided not to compress. Input size: "
"%" PRIuPTR,
algo_name, payload->Length());
}
}
}
grpc_call_next_op(elem, std::exchange(send_message_batch_, nullptr));
}
void CallData::FailSendMessageBatchInCallCombiner(void* calld_arg,
grpc_error_handle error) {
CallData* calld = static_cast<CallData*>(calld_arg);
if (calld->send_message_batch_ != nullptr) {
grpc_transport_stream_op_batch_finish_with_failure(
calld->send_message_batch_, error, calld->call_combiner_);
calld->send_message_batch_ = nullptr;
}
}
void CallData::ForwardSendMessageBatch(void* elem_arg,
grpc_error_handle /*unused*/) {
grpc_call_element* elem = static_cast<grpc_call_element*>(elem_arg);
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->FinishSendMessage(elem);
}
void CallData::CompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
// Handle cancel_stream.
if (batch->cancel_stream) {
cancel_error_ = batch->payload->cancel_stream.cancel_error;
if (send_message_batch_ != nullptr) {
if (!seen_initial_metadata_) {
GRPC_CALL_COMBINER_START(
call_combiner_,
GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, this,
grpc_schedule_on_exec_ctx),
cancel_error_, "failing send_message op");
}
}
} else if (!cancel_error_.ok()) {
grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
call_combiner_);
return;
}
// Handle send_initial_metadata.
if (batch->send_initial_metadata) {
GPR_ASSERT(!seen_initial_metadata_);
ProcessSendInitialMetadata(
elem, batch->payload->send_initial_metadata.send_initial_metadata);
seen_initial_metadata_ = true;
// If we had previously received a batch containing a send_message op,
// handle it now. Note that we need to re-enter the call combiner
// for this, since we can't send two batches down while holding the
// call combiner, since the connected_channel filter (at the bottom of
// the call stack) will release the call combiner for each batch it sees.
if (send_message_batch_ != nullptr) {
GRPC_CALL_COMBINER_START(
call_combiner_, &forward_send_message_batch_in_call_combiner_,
absl::OkStatus(),
"starting send_message after send_initial_metadata");
}
}
// Handle send_message.
if (batch->send_message) {
GPR_ASSERT(send_message_batch_ == nullptr);
send_message_batch_ = batch;
// If we have not yet seen send_initial_metadata, then we have to
// wait. We save the batch and then drop the call combiner, which we'll
// have to pick up again later when we get send_initial_metadata.
if (!seen_initial_metadata_) {
GRPC_CALL_COMBINER_STOP(
call_combiner_, "send_message batch pending send_initial_metadata");
return;
}
FinishSendMessage(elem);
} else {
// Pass control down the stack.
grpc_call_next_op(elem, batch);
}
}
void CompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->CompressStartTransportStreamOpBatch(elem, batch);
}
/* Constructor for call_data */
grpc_error_handle CompressInitCallElem(grpc_call_element* elem,
const grpc_call_element_args* args) {
new (elem->call_data) CallData(elem, *args);
return absl::OkStatus();
}
/* Destructor for call_data */
void CompressDestroyCallElem(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
/* Constructor for ChannelData */
grpc_error_handle CompressInitChannelElem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
new (elem->channel_data) ChannelData(args);
return absl::OkStatus();
}
/* Destructor for channel data */
void CompressDestroyChannelElem(grpc_channel_element* elem) {
ChannelData* channeld = static_cast<ChannelData*>(elem->channel_data);
channeld->~ChannelData();
}
} // namespace
const grpc_channel_filter grpc_message_compress_filter = {
CompressStartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
CompressInitCallElem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CompressDestroyCallElem,
sizeof(ChannelData),
CompressInitChannelElem,
grpc_channel_stack_no_post_init,
CompressDestroyChannelElem,
grpc_channel_next_get_info,
"message_compress"};

@ -1,52 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H
#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
/** Compression filter for outgoing data.
*
* See <grpc/compression.h> for the available compression settings.
*
* Compression settings may come from:
* - Channel configuration, as established at channel creation time.
* - The metadata accompanying the outgoing data to be compressed. This is
* taken as a request only. We may choose not to honor it. The metadata key
* is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY.
*
* Compression can be disabled for concrete messages (for instance in order to
* prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in
* the BEGIN_MESSAGE flags.
*
* The attempted compression mechanism is added to the resulting initial
* metadata under the'grpc-encoding' key.
*
* If compression is actually performed, BEGIN_MESSAGE's flag is modified to
* incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the
* aforementioned 'grpc-encoding' metadata value, data will pass through
* uncompressed. */
extern const grpc_channel_filter grpc_message_compress_filter;
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H \
*/

@ -1,322 +0,0 @@
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
#include <string.h>
#include <cstdint>
#include <new>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
namespace {
class ChannelData {
public:
explicit ChannelData(const grpc_channel_element_args* args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(
ChannelArgs::FromC(args->channel_args))),
message_size_service_config_parser_index_(
MessageSizeParser::ParserIndex()) {}
absl::optional<uint32_t> max_recv_size() const { return max_recv_size_; }
size_t message_size_service_config_parser_index() const {
return message_size_service_config_parser_index_;
}
private:
absl::optional<uint32_t> max_recv_size_;
const size_t message_size_service_config_parser_index_;
};
class CallData {
public:
CallData(const grpc_call_element_args& args, const ChannelData* chand)
: call_combiner_(args.call_combiner),
max_recv_message_length_(chand->max_recv_size()) {
// Initialize state for recv_initial_metadata_ready callback
GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_,
OnRecvInitialMetadataReady, this,
grpc_schedule_on_exec_ctx);
// Initialize state for recv_message_ready callback
GRPC_CLOSURE_INIT(&on_recv_message_ready_, OnRecvMessageReady, this,
grpc_schedule_on_exec_ctx);
// Initialize state for recv_trailing_metadata_ready callback
GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_,
OnRecvTrailingMetadataReady, this,
grpc_schedule_on_exec_ctx);
const MessageSizeParsedConfig* limits =
MessageSizeParsedConfig::GetFromCallContext(
args.context, chand->message_size_service_config_parser_index());
if (limits != nullptr && limits->max_recv_size().has_value() &&
(!max_recv_message_length_.has_value() ||
*limits->max_recv_size() < *max_recv_message_length_)) {
max_recv_message_length_ = *limits->max_recv_size();
}
}
void DecompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
private:
static void OnRecvInitialMetadataReady(void* arg, grpc_error_handle error);
// Methods for processing a receive message event
void MaybeResumeOnRecvMessageReady();
static void OnRecvMessageReady(void* arg, grpc_error_handle error);
void ContinueRecvMessageReadyCallback(grpc_error_handle error);
// Methods for processing a recv_trailing_metadata event
void MaybeResumeOnRecvTrailingMetadataReady();
static void OnRecvTrailingMetadataReady(void* arg, grpc_error_handle error);
CallCombiner* call_combiner_;
// Overall error for the call
grpc_error_handle error_;
// Fields for handling recv_initial_metadata_ready callback
grpc_closure on_recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
// Fields for handling recv_message_ready callback
bool seen_recv_message_ready_ = false;
absl::optional<uint32_t> max_recv_message_length_;
grpc_compression_algorithm algorithm_ = GRPC_COMPRESS_NONE;
absl::optional<SliceBuffer>* recv_message_ = nullptr;
uint32_t* recv_message_flags_ = nullptr;
grpc_closure on_recv_message_ready_;
grpc_closure* original_recv_message_ready_ = nullptr;
// Fields for handling recv_trailing_metadata_ready callback
bool seen_recv_trailing_metadata_ready_ = false;
grpc_closure on_recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
grpc_error_handle on_recv_trailing_metadata_ready_error_;
};
void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error_handle error) {
CallData* calld = static_cast<CallData*>(arg);
if (error.ok()) {
calld->algorithm_ =
calld->recv_initial_metadata_->get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE);
}
calld->MaybeResumeOnRecvMessageReady();
calld->MaybeResumeOnRecvTrailingMetadataReady();
grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
calld->original_recv_initial_metadata_ready_ = nullptr;
Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::MaybeResumeOnRecvMessageReady() {
if (seen_recv_message_ready_) {
seen_recv_message_ready_ = false;
GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_message_ready_,
absl::OkStatus(),
"continue recv_message_ready callback");
}
}
void CallData::OnRecvMessageReady(void* arg, grpc_error_handle error) {
CallData* calld = static_cast<CallData*>(arg);
if (error.ok()) {
if (calld->original_recv_initial_metadata_ready_ != nullptr) {
calld->seen_recv_message_ready_ = true;
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"Deferring OnRecvMessageReady until after "
"OnRecvInitialMetadataReady");
return;
}
if (calld->algorithm_ != GRPC_COMPRESS_NONE) {
// recv_message can be NULL if trailing metadata is received instead of
// message, or it's possible that the message was not compressed.
if (!calld->recv_message_->has_value() ||
(*calld->recv_message_)->Length() == 0 ||
((*calld->recv_message_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) == 0)) {
return calld->ContinueRecvMessageReadyCallback(absl::OkStatus());
}
if (calld->max_recv_message_length_.has_value() &&
(*calld->recv_message_)->Length() >
static_cast<uint32_t>(*calld->max_recv_message_length_)) {
GPR_DEBUG_ASSERT(calld->error_.ok());
calld->error_ = grpc_error_set_int(
GRPC_ERROR_CREATE(
absl::StrFormat("Received message larger than max (%u vs. %d)",
(*calld->recv_message_)->Length(),
*calld->max_recv_message_length_)),
StatusIntProperty::kRpcStatus, GRPC_STATUS_RESOURCE_EXHAUSTED);
return calld->ContinueRecvMessageReadyCallback(calld->error_);
}
SliceBuffer decompressed_slices;
if (grpc_msg_decompress(calld->algorithm_,
(*calld->recv_message_)->c_slice_buffer(),
decompressed_slices.c_slice_buffer()) == 0) {
GPR_DEBUG_ASSERT(calld->error_.ok());
calld->error_ = GRPC_ERROR_CREATE(absl::StrCat(
"Unexpected error decompressing data for algorithm with "
"enum value ",
calld->algorithm_));
} else {
*calld->recv_message_flags_ =
(*calld->recv_message_flags_ & (~GRPC_WRITE_INTERNAL_COMPRESS)) |
GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
(*calld->recv_message_)->Swap(&decompressed_slices);
}
return calld->ContinueRecvMessageReadyCallback(calld->error_);
}
}
calld->ContinueRecvMessageReadyCallback(error);
}
void CallData::ContinueRecvMessageReadyCallback(grpc_error_handle error) {
MaybeResumeOnRecvTrailingMetadataReady();
// The surface will clean up the receiving stream if there is an error.
grpc_closure* closure = original_recv_message_ready_;
original_recv_message_ready_ = nullptr;
Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::MaybeResumeOnRecvTrailingMetadataReady() {
if (seen_recv_trailing_metadata_ready_) {
seen_recv_trailing_metadata_ready_ = false;
grpc_error_handle error = on_recv_trailing_metadata_ready_error_;
on_recv_trailing_metadata_ready_error_ = absl::OkStatus();
GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_trailing_metadata_ready_,
error, "Continuing OnRecvTrailingMetadataReady");
}
}
void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
CallData* calld = static_cast<CallData*>(arg);
if (calld->original_recv_initial_metadata_ready_ != nullptr ||
calld->original_recv_message_ready_ != nullptr) {
calld->seen_recv_trailing_metadata_ready_ = true;
calld->on_recv_trailing_metadata_ready_error_ = error;
GRPC_CALL_COMBINER_STOP(
calld->call_combiner_,
"Deferring OnRecvTrailingMetadataReady until after "
"OnRecvInitialMetadataReady and OnRecvMessageReady");
return;
}
error = grpc_error_add_child(error, calld->error_);
calld->error_ = absl::OkStatus();
grpc_closure* closure = calld->original_recv_trailing_metadata_ready_;
calld->original_recv_trailing_metadata_ready_ = nullptr;
Closure::Run(DEBUG_LOCATION, closure, error);
}
void CallData::DecompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
// Handle recv_initial_metadata.
if (batch->recv_initial_metadata) {
recv_initial_metadata_ =
batch->payload->recv_initial_metadata.recv_initial_metadata;
original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&on_recv_initial_metadata_ready_;
}
// Handle recv_message
if (batch->recv_message) {
recv_message_ = batch->payload->recv_message.recv_message;
recv_message_flags_ = batch->payload->recv_message.flags;
original_recv_message_ready_ =
batch->payload->recv_message.recv_message_ready;
batch->payload->recv_message.recv_message_ready = &on_recv_message_ready_;
}
// Handle recv_trailing_metadata
if (batch->recv_trailing_metadata) {
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&on_recv_trailing_metadata_ready_;
}
// Pass control down the stack.
grpc_call_next_op(elem, batch);
}
void DecompressStartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->DecompressStartTransportStreamOpBatch(elem, batch);
}
grpc_error_handle DecompressInitCallElem(grpc_call_element* elem,
const grpc_call_element_args* args) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
new (elem->call_data) CallData(*args, chand);
return absl::OkStatus();
}
void DecompressDestroyCallElem(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->~CallData();
}
grpc_error_handle DecompressInitChannelElem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
new (chand) ChannelData(args);
return absl::OkStatus();
}
void DecompressDestroyChannelElem(grpc_channel_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
} // namespace
const grpc_channel_filter MessageDecompressFilter = {
DecompressStartTransportStreamOpBatch,
nullptr,
grpc_channel_next_op,
sizeof(CallData),
DecompressInitCallElem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
DecompressDestroyCallElem,
sizeof(ChannelData),
DecompressInitChannelElem,
grpc_channel_stack_no_post_init,
DecompressDestroyChannelElem,
grpc_channel_next_get_info,
"message_decompress"};
} // namespace grpc_core

@ -1,32 +0,0 @@
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H
#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
namespace grpc_core {
extern const grpc_channel_filter MessageDecompressFilter;
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \
*/

@ -1970,7 +1970,8 @@ void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
// what we want - which is safe because we haven't told anyone
// about the metadata yet
if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
s->recv_trailing_metadata_finished != nullptr) {
s->recv_trailing_metadata_finished != nullptr ||
!s->final_metadata_requested) {
s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status);
if (!message.empty()) {
s->trailing_metadata_buffer.Set(

@ -418,6 +418,10 @@ class ClientStream : public Orphanable {
message_to_send_->payload();
batch_payload_.send_message.flags = message_to_send_->flags();
} else {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%sPollConnectedChannel: half close",
Activity::current()->DebugTag().c_str());
}
GPR_ASSERT(!absl::holds_alternative<Closed>(send_message_state_));
client_trailing_metadata_ =
GetContext<Arena>()->MakePooled<ClientMetadata>(
@ -473,6 +477,7 @@ class ClientStream : public Orphanable {
if (server_initial_metadata_state_ == ServerInitialMetadataState::kSet &&
!absl::holds_alternative<PipeSender<MessageHandle>::PushType>(
recv_message_state_) &&
!absl::holds_alternative<PendingReceiveMessage>(recv_message_state_) &&
std::exchange(queued_trailing_metadata_, false)) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
@ -579,6 +584,10 @@ class ClientStream : public Orphanable {
recv_message_waker_.ActivityDebugTag().c_str());
}
} else {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%sRecvMessageBatchDone: received message",
recv_message_waker_.ActivityDebugTag().c_str());
}
auto pending =
absl::get_if<PendingReceiveMessage>(&recv_message_state_);
GPR_ASSERT(pending != nullptr);

@ -527,6 +527,12 @@ const char* BaseCallData::ReceiveMessage::StateString(State state) {
return "CANCELLED_WHILST_FORWARDING";
case State::kBatchCompletedButCancelled:
return "BATCH_COMPLETED_BUT_CANCELLED";
case State::kCancelledWhilstIdle:
return "CANCELLED_WHILST_IDLE";
case State::kCompletedWhilePulledFromPipe:
return "COMPLETED_WHILE_PULLED_FROM_PIPE";
case State::kCompletedWhilePushedToPipe:
return "COMPLETED_WHILE_PUSHED_TO_PIPE";
}
return "UNKNOWN";
}
@ -551,7 +557,10 @@ void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) {
case State::kBatchCompletedNoPipe:
case State::kPushedToPipe:
case State::kPulledFromPipe:
case State::kCompletedWhilePulledFromPipe:
case State::kCompletedWhilePushedToPipe:
abort();
case State::kCancelledWhilstIdle:
case State::kCancelled:
return;
}
@ -586,7 +595,10 @@ void BaseCallData::ReceiveMessage::GotPipe(PipeSender<MessageHandle>* sender) {
case State::kBatchCompleted:
case State::kPushedToPipe:
case State::kPulledFromPipe:
case State::kCompletedWhilePulledFromPipe:
case State::kCompletedWhilePushedToPipe:
case State::kCancelledWhilstForwarding:
case State::kCancelledWhilstIdle:
case State::kBatchCompletedButCancelled:
abort();
case State::kCancelled:
@ -610,6 +622,9 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) {
case State::kBatchCompletedNoPipe:
case State::kCancelled:
case State::kBatchCompletedButCancelled:
case State::kCancelledWhilstIdle:
case State::kCompletedWhilePulledFromPipe:
case State::kCompletedWhilePushedToPipe:
abort();
case State::kForwardedBatchNoPipe:
state_ = State::kBatchCompletedNoPipe;
@ -636,28 +651,41 @@ void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata,
}
switch (state_) {
case State::kInitial:
case State::kIdle:
state_ = State::kCancelled;
break;
case State::kIdle:
state_ = State::kCancelledWhilstIdle;
break;
case State::kForwardedBatch:
case State::kForwardedBatchNoPipe:
state_ = State::kCancelledWhilstForwarding;
break;
case State::kCompletedWhilePulledFromPipe:
case State::kCompletedWhilePushedToPipe:
case State::kPulledFromPipe:
case State::kPushedToPipe: {
auto status_code =
metadata.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_OK);
GPR_ASSERT(status_code != GRPC_STATUS_OK);
push_.reset();
next_.reset();
flusher->AddClosure(intercepted_on_complete_,
StatusFromMetadata(metadata), "recv_message_done");
state_ = State::kCancelled;
metadata.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
if (status_code == GRPC_STATUS_OK) {
if (state_ == State::kCompletedWhilePulledFromPipe ||
state_ == State::kPulledFromPipe) {
state_ = State::kCompletedWhilePulledFromPipe;
} else {
state_ = State::kCompletedWhilePushedToPipe;
}
} else {
push_.reset();
next_.reset();
flusher->AddClosure(intercepted_on_complete_,
StatusFromMetadata(metadata), "recv_message_done");
state_ = State::kCancelled;
}
} break;
case State::kBatchCompleted:
case State::kBatchCompletedNoPipe:
case State::kBatchCompletedButCancelled:
abort();
case State::kCancelledWhilstIdle:
case State::kCancelledWhilstForwarding:
case State::kCancelled:
break;
@ -666,8 +694,10 @@ void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata,
void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) {
if (grpc_trace_channel.enabled()) {
gpr_log(GPR_DEBUG, "%s ReceiveMessage.WakeInsideCombiner st=%s",
base_->LogTag().c_str(), StateString(state_));
gpr_log(GPR_DEBUG,
"%s ReceiveMessage.WakeInsideCombiner st=%s push?=%s next?=%s",
base_->LogTag().c_str(), StateString(state_),
push_.has_value() ? "yes" : "no", next_.has_value() ? "yes" : "no");
}
switch (state_) {
case State::kInitial:
@ -678,6 +708,10 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) {
case State::kCancelledWhilstForwarding:
case State::kBatchCompletedNoPipe:
break;
case State::kCancelledWhilstIdle:
sender_->Close();
state_ = State::kCancelled;
break;
case State::kBatchCompletedButCancelled:
sender_->Close();
state_ = State::kCancelled;
@ -701,10 +735,16 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) {
}
GPR_ASSERT(state_ == State::kPushedToPipe);
ABSL_FALLTHROUGH_INTENDED;
case State::kCompletedWhilePushedToPipe:
case State::kPushedToPipe: {
GPR_ASSERT(push_.has_value());
auto r_push = (*push_)();
if (auto* p = absl::get_if<bool>(&r_push)) {
if (grpc_trace_channel.enabled()) {
gpr_log(GPR_DEBUG,
"%s ReceiveMessage.WakeInsideCombiner push complete: %s",
base_->LogTag().c_str(), *p ? "true" : "false");
}
// We haven't pulled through yet, so this certainly shouldn't succeed.
GPR_ASSERT(!*p);
state_ = State::kCancelled;
@ -713,11 +753,21 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) {
GPR_ASSERT(next_.has_value());
auto r_next = (*next_)();
if (auto* p = absl::get_if<NextResult<MessageHandle>>(&r_next)) {
if (grpc_trace_channel.enabled()) {
gpr_log(GPR_DEBUG,
"%s ReceiveMessage.WakeInsideCombiner next complete: %s",
base_->LogTag().c_str(),
p->has_value() ? "got message" : "end of stream");
}
next_.reset();
if (p->has_value()) {
*intercepted_slice_buffer_ = std::move(*(**p)->payload());
*intercepted_flags_ = (**p)->flags();
state_ = State::kPulledFromPipe;
if (state_ == State::kCompletedWhilePushedToPipe) {
state_ = State::kCompletedWhilePulledFromPipe;
} else {
state_ = State::kPulledFromPipe;
}
} else {
*intercepted_slice_buffer_ = absl::nullopt;
*intercepted_flags_ = 0;
@ -725,12 +775,26 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) {
}
}
}
if (state_ != State::kPulledFromPipe) break;
if (state_ != State::kPulledFromPipe &&
state_ != State::kCompletedWhilePulledFromPipe) {
break;
}
ABSL_FALLTHROUGH_INTENDED;
case State::kCompletedWhilePulledFromPipe:
case State::kPulledFromPipe: {
GPR_ASSERT(push_.has_value());
if (!absl::holds_alternative<Pending>((*push_)())) {
state_ = State::kIdle;
if (grpc_trace_channel.enabled()) {
gpr_log(GPR_DEBUG,
"%s ReceiveMessage.WakeInsideCombiner push complete",
base_->LogTag().c_str());
}
if (state_ == State::kCompletedWhilePulledFromPipe) {
sender_->Close();
state_ = State::kCancelled;
} else {
state_ = State::kIdle;
}
push_.reset();
flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr),
absl::OkStatus(), "recv_message");
@ -1512,8 +1576,10 @@ void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
Flusher flusher(this);
if (grpc_trace_channel.enabled()) {
gpr_log(GPR_DEBUG,
"%s ClientCallData.RecvTrailingMetadataReady error=%s md=%s",
LogTag().c_str(), error.ToString().c_str(),
"%s ClientCallData.RecvTrailingMetadataReady "
"recv_trailing_state=%s error=%s md=%s",
LogTag().c_str(), StateString(recv_trailing_state_),
error.ToString().c_str(),
recv_trailing_metadata_->DebugString().c_str());
}
// If we were cancelled prior to receiving this callback, we should simply
@ -1536,6 +1602,9 @@ void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
// Record that we've got the callback.
GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kForwarded);
recv_trailing_state_ = RecvTrailingState::kComplete;
if (receive_message() != nullptr) {
receive_message()->Done(*recv_trailing_metadata_, &flusher);
}
// Repoll the promise.
ScopedContext context(this);
WakeInsideCombiner(&flusher);

@ -393,6 +393,9 @@ class BaseCallData : public Activity, private Wakeable {
kPulledFromPipe,
// We're done.
kCancelled,
// Call got terminated whilst we were idle: we need to close the sender
// pipe next poll.
kCancelledWhilstIdle,
// Call got terminated whilst we had forwarded a recv_message down the
// stack: we need to keep track of that until we get the completion so
// that we do the right thing in OnComplete.
@ -402,6 +405,9 @@ class BaseCallData : public Activity, private Wakeable {
// On the next poll we'll close things out and forward on completions,
// then transition to cancelled.
kBatchCompletedButCancelled,
// Completed successfully while we're processing a recv message.
kCompletedWhilePushedToPipe,
kCompletedWhilePulledFromPipe,
};
static const char* StateString(State);

@ -21,11 +21,13 @@
#include "src/core/lib/compression/compression_internal.h"
#include <stdlib.h>
#include <string.h>
#include <string>
#include "absl/container/inlined_vector.h"
#include "absl/strings/ascii.h"
#include "absl/strings/str_split.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
@ -163,19 +165,13 @@ CompressionAlgorithmSet CompressionAlgorithmSet::FromUint32(uint32_t value) {
}
CompressionAlgorithmSet CompressionAlgorithmSet::FromChannelArgs(
const grpc_channel_args* args) {
const ChannelArgs& args) {
CompressionAlgorithmSet set;
static const uint32_t kEverything =
(1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (args != nullptr) {
set = CompressionAlgorithmSet::FromUint32(grpc_channel_args_find_integer(
args, GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
grpc_integer_options{kEverything, 0, kEverything}));
set.Set(GRPC_COMPRESS_NONE);
} else {
set = CompressionAlgorithmSet::FromUint32(kEverything);
}
return set;
return CompressionAlgorithmSet::FromUint32(
args.GetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)
.value_or(kEverything));
}
CompressionAlgorithmSet::CompressionAlgorithmSet() = default;
@ -230,18 +226,14 @@ uint32_t CompressionAlgorithmSet::ToLegacyBitmask() const {
}
absl::optional<grpc_compression_algorithm>
DefaultCompressionAlgorithmFromChannelArgs(const grpc_channel_args* args) {
if (args == nullptr) return absl::nullopt;
for (size_t i = 0; i < args->num_args; i++) {
if (strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM) ==
0) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
return static_cast<grpc_compression_algorithm>(
args->args[i].value.integer);
} else if (args->args[i].type == GRPC_ARG_STRING) {
return ParseCompressionAlgorithm(args->args[i].value.string);
}
}
DefaultCompressionAlgorithmFromChannelArgs(const ChannelArgs& args) {
auto* value = args.Get(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM);
if (value == nullptr) return absl::nullopt;
if (auto* p = absl::get_if<int>(value)) {
return static_cast<grpc_compression_algorithm>(*p);
}
if (auto* p = absl::get_if<std::string>(value)) {
return ParseCompressionAlgorithm(*p);
}
return absl::nullopt;
}

@ -29,8 +29,8 @@
#include "absl/types/optional.h"
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/slice/slice.h"
@ -46,7 +46,7 @@ const char* CompressionAlgorithmAsString(grpc_compression_algorithm algorithm);
// Retrieve the default compression algorithm from channel args, return nullopt
// if not found.
absl::optional<grpc_compression_algorithm>
DefaultCompressionAlgorithmFromChannelArgs(const grpc_channel_args* args);
DefaultCompressionAlgorithmFromChannelArgs(const ChannelArgs& args);
// A set of grpc_compression_algorithm values.
class CompressionAlgorithmSet {
@ -55,7 +55,7 @@ class CompressionAlgorithmSet {
// algorithm 1, etc.
static CompressionAlgorithmSet FromUint32(uint32_t value);
// Locate in channel args and construct from the found value.
static CompressionAlgorithmSet FromChannelArgs(const grpc_channel_args* args);
static CompressionAlgorithmSet FromChannelArgs(const ChannelArgs& args);
// Parse a string of comma-separated compression algorithms.
static CompressionAlgorithmSet FromString(absl::string_view str);
// Construct an empty set.

@ -171,6 +171,18 @@ class BitSet {
return result;
}
BitSet& Set(int i, bool value) {
set(i, value);
return *this;
}
BitSet& SetAll(bool value) {
for (size_t i = 0; i < kTotalBits; i++) {
set(i, value);
}
return *this;
}
private:
// Given a bit index, return which unit it's stored in.
static constexpr size_t unit_for(size_t bit) { return bit / kUnitBits; }

@ -1358,11 +1358,10 @@ void Server::CallData::RecvInitialMetadataReady(void* arg,
}
if (calld->host_.has_value() && calld->path_.has_value()) {
/* do nothing */
} else {
} else if (error.ok()) {
/* Pass the error reference to calld->recv_initial_metadata_error */
grpc_error_handle src_error = error;
error = GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path",
&src_error, 1);
error = absl::UnknownError("Missing :authority or :path");
calld->recv_initial_metadata_error_ = error;
}
grpc_closure* closure = calld->original_recv_initial_metadata_ready_;

@ -204,7 +204,10 @@ StaticSlice HttpMethodMetadata::Encode(ValueType x) {
case kGet:
return StaticSlice::FromStaticString("GET");
default:
abort();
// TODO(ctiller): this should be an abort, we should split up the debug
// string generation from the encode string generation so that debug
// strings can always succeed and encode strings can crash.
return StaticSlice::FromStaticString("<<INVALID METHOD>>");
}
}

@ -80,8 +80,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/http/client/http_client_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/rbac/rbac_filter.cc',

@ -64,6 +64,9 @@ grpc_cc_test(
grpc_cc_test(
name = "minimal_stack_is_minimal_test",
srcs = ["minimal_stack_is_minimal_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,

@ -29,14 +29,13 @@
* configurations and assess whether such a change is correct and desirable.
*/
#include <stdarg.h>
#include <string.h>
#include <algorithm>
#include <string>
#include <vector>
#include "absl/strings/str_join.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
@ -52,88 +51,12 @@
#include "src/core/lib/transport/transport_impl.h"
#include "test/core/util/test_config.h"
// use CHECK_STACK instead
static int check_stack(const char* file, int line, const char* transport_name,
grpc_channel_args* init_args,
unsigned channel_stack_type, ...);
// arguments: const char *transport_name - the name of the transport type to
// simulate
// grpc_channel_args *init_args - channel args to pass down
// grpc_channel_stack_type channel_stack_type - the archetype of
// channel stack to create
// variadic arguments - the (in-order) expected list of channel
// filters to instantiate, terminated with NULL
#define CHECK_STACK(...) check_stack(__FILE__, __LINE__, __VA_ARGS__)
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int errors = 0;
// tests with a minimal stack
grpc_arg minimal_stack_arg;
minimal_stack_arg.type = GRPC_ARG_INTEGER;
minimal_stack_arg.key = const_cast<char*>(GRPC_ARG_MINIMAL_STACK);
minimal_stack_arg.value.integer = 1;
grpc_channel_args minimal_stack_args = {1, &minimal_stack_arg};
errors +=
CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL,
"authority", "connected", NULL);
errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL,
"authority", "connected", NULL);
errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_SERVER_CHANNEL,
"server", "connected", NULL);
errors +=
CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL,
"authority", "http-client", "connected", NULL);
errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL,
"authority", "http-client", "connected", NULL);
errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_SERVER_CHANNEL,
"server", "http-server", "connected", NULL);
errors += CHECK_STACK(nullptr, &minimal_stack_args, GRPC_CLIENT_CHANNEL,
"client-channel", NULL);
// tests with a default stack
errors +=
CHECK_STACK("unknown", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority",
"message_size", "deadline", "connected", NULL);
errors += CHECK_STACK("unknown", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority",
"message_size", "connected", NULL);
errors += CHECK_STACK("unknown", nullptr, GRPC_SERVER_CHANNEL, "server",
"message_size", "deadline", "connected", NULL);
errors +=
CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority",
"message_size", "deadline", "http-client",
"message_decompress", "message_compress", "connected", NULL);
errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority",
"message_size", "http-client", "message_decompress",
"message_compress", "connected", NULL);
errors +=
CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server",
"message_size", "deadline", "http-server",
"message_decompress", "message_compress", "connected", NULL);
errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel",
NULL);
GPR_ASSERT(errors == 0);
grpc_shutdown();
return 0;
}
/*******************************************************************************
* End of tests definitions, start of test infrastructure
*/
static int check_stack(const char* file, int line, const char* transport_name,
grpc_channel_args* init_args,
unsigned channel_stack_type, ...) {
grpc_core::ChannelArgs channel_args =
grpc_core::ChannelArgs::FromC(init_args);
std::vector<std::string> MakeStack(const char* transport_name,
const grpc_core::ChannelArgs& channel_args,
grpc_channel_stack_type channel_stack_type) {
// create phony channel stack
grpc_core::ChannelStackBuilderImpl builder(
"test", static_cast<grpc_channel_stack_type>(channel_stack_type),
channel_args);
grpc_core::ChannelStackBuilderImpl builder("test", channel_stack_type,
channel_args);
grpc_transport_vtable fake_transport_vtable;
memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable));
fake_transport_vtable.name = transport_name;
@ -148,44 +71,75 @@ static int check_stack(const char* file, int line, const char* transport_name,
&builder));
}
// build up our expectation list
std::vector<std::string> parts;
va_list args;
va_start(args, channel_stack_type);
for (;;) {
char* a = va_arg(args, char*);
if (a == nullptr) break;
parts.push_back(a);
}
va_end(args);
std::string expect = absl::StrJoin(parts, ", ");
// build up our "got" list
parts.clear();
for (const auto& entry : *builder.mutable_stack()) {
const char* name = entry->name;
if (name == nullptr) continue;
parts.push_back(name);
}
std::string got = absl::StrJoin(parts, ", ");
// figure out result, log if there's an error
int result = 0;
if (got != expect) {
std::string args_str = channel_args.ToString();
gpr_log(file, line, GPR_LOG_SEVERITY_ERROR,
"**************************************************");
gpr_log(
file, line, GPR_LOG_SEVERITY_ERROR,
"FAILED transport=%s; stack_type=%s; channel_args=%s:", transport_name,
grpc_channel_stack_type_string(
static_cast<grpc_channel_stack_type>(channel_stack_type)),
args_str.c_str());
gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "EXPECTED: %s", expect.c_str());
gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "GOT: %s", got.c_str());
result = 1;
}
return result;
return parts;
}
TEST(ChannelStackFilters, LooksAsExpected) {
const auto minimal_stack_args =
grpc_core::ChannelArgs().Set(GRPC_ARG_MINIMAL_STACK, true);
const auto no_args = grpc_core::ChannelArgs();
EXPECT_EQ(
MakeStack("unknown", minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>({"authority", "connected"}));
EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>({"authority", "connected"}));
EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "connected"}));
EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>(
{"authority", "http-client", "compression", "connected"}));
EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>(
{"authority", "http-client", "compression", "connected"}));
EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>(
{"server", "http-server", "compression", "connected"}));
EXPECT_EQ(MakeStack(nullptr, minimal_stack_args, GRPC_CLIENT_CHANNEL),
std::vector<std::string>({"client-channel"}));
// tests with a default stack
EXPECT_EQ(MakeStack("unknown", no_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>(
{"authority", "message_size", "deadline", "connected"}));
EXPECT_EQ(
MakeStack("unknown", no_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>({"authority", "message_size", "connected"}));
EXPECT_EQ(MakeStack("unknown", no_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>(
{"server", "message_size", "deadline", "connected"}));
EXPECT_EQ(
MakeStack("chttp2", no_args, GRPC_CLIENT_DIRECT_CHANNEL),
std::vector<std::string>({"authority", "message_size", "deadline",
"http-client", "compression", "connected"}));
EXPECT_EQ(
MakeStack("chttp2", no_args, GRPC_CLIENT_SUBCHANNEL),
std::vector<std::string>({"authority", "message_size", "http-client",
"compression", "connected"}));
EXPECT_EQ(
MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL),
std::vector<std::string>({"server", "message_size", "deadline",
"http-server", "compression", "connected"}));
EXPECT_EQ(MakeStack(nullptr, no_args, GRPC_CLIENT_CHANNEL),
std::vector<std::string>({"client-channel"}));
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
grpc_init();
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}

@ -14,7 +14,6 @@
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
"grpc_cc_test",
"grpc_package",
)
@ -24,15 +23,6 @@ grpc_package(name = "test/core/compression")
licenses(["notice"])
grpc_cc_library(
name = "args_utils",
testonly = 1,
srcs = ["args_utils.cc"],
hdrs = ["args_utils.h"],
visibility = ["//visibility:public"],
deps = ["//:grpc"],
)
grpc_cc_test(
name = "compression_test",
srcs = ["compression_test.cc"],
@ -41,7 +31,6 @@ grpc_cc_test(
uses_event_engine = False,
uses_polling = False,
deps = [
":args_utils",
"//:gpr",
"//:grpc",
"//test/core/util:grpc_test_util",

@ -1,103 +0,0 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/compression/args_utils.h"
#include <string.h>
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/gpr/useful.h"
const grpc_channel_args*
grpc_channel_args_set_channel_default_compression_algorithm(
const grpc_channel_args* a, grpc_compression_algorithm algorithm) {
GPR_ASSERT(algorithm < GRPC_COMPRESS_ALGORITHMS_COUNT);
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key = const_cast<char*>(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM);
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
/** Returns 1 if the argument for compression algorithm's enabled states bitset
* was found in \a a, returning the arg's value in \a states. Otherwise, returns
* 0. */
static int find_compression_algorithm_states_bitset(const grpc_channel_args* a,
int** states_arg) {
if (a != nullptr) {
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
!strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
**states_arg =
(**states_arg & ((1 << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1)) |
0x1; /* forcefully enable support for no compression */
return 1;
}
}
}
return 0; /* GPR_FALSE */
}
const grpc_channel_args* grpc_channel_args_compression_algorithm_set_state(
const grpc_channel_args** a, grpc_compression_algorithm algorithm,
int state) {
int* states_arg = nullptr;
const grpc_channel_args* result = *a;
const int states_arg_found =
find_compression_algorithm_states_bitset(*a, &states_arg);
if (grpc_core::DefaultCompressionAlgorithmFromChannelArgs(*a) == algorithm &&
state == 0) {
const char* algo_name = nullptr;
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name) != 0);
gpr_log(GPR_ERROR,
"Tried to disable default compression algorithm '%s'. The "
"operation has been ignored.",
algo_name);
} else if (states_arg_found) {
if (state != 0) {
grpc_core::SetBit(reinterpret_cast<unsigned*>(states_arg), algorithm);
} else if (algorithm != GRPC_COMPRESS_NONE) {
grpc_core::ClearBit(reinterpret_cast<unsigned*>(states_arg), algorithm);
}
} else {
/* create a new arg */
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
tmp.key =
const_cast<char*>(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET);
/* all enabled by default */
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {
grpc_core::SetBit(reinterpret_cast<unsigned*>(&tmp.value.integer),
algorithm);
} else if (algorithm != GRPC_COMPRESS_NONE) {
grpc_core::ClearBit(reinterpret_cast<unsigned*>(&tmp.value.integer),
algorithm);
}
result = grpc_channel_args_copy_and_add(*a, &tmp, 1);
grpc_channel_args_destroy(*a);
*a = result;
}
return result;
}

@ -1,34 +0,0 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_COMPRESSION_ARGS_UTILS_H_H
#define GRPC_TEST_CORE_COMPRESSION_ARGS_UTILS_H_H
#include <grpc/compression.h>
#include <grpc/grpc.h>
// TODO(ctiller): when we do the channel args migration, just delete this.
const grpc_channel_args*
grpc_channel_args_set_channel_default_compression_algorithm(
const grpc_channel_args* a, grpc_compression_algorithm algorithm);
const grpc_channel_args* grpc_channel_args_compression_algorithm_set_state(
const grpc_channel_args** a, grpc_compression_algorithm algorithm,
int state);
const grpc_channel_args*
grpc_channel_args_set_channel_default_compression_algorithm(
const grpc_channel_args* a, grpc_compression_algorithm algorithm);
#endif

@ -22,15 +22,10 @@
#include "gtest/gtest.h"
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/compression/args_utils.h"
#include "test/core/util/test_config.h"
TEST(CompressionTest, CompressionAlgorithmParse) {
@ -245,70 +240,6 @@ TEST(CompressionTest, CompressionEnableDisableAlgorithm) {
}
}
TEST(CompressionTest, ChannelArgsSetCompressionAlgorithm) {
grpc_core::ExecCtx exec_ctx;
const grpc_channel_args* ch_args;
ch_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, GRPC_COMPRESS_GZIP);
ASSERT_EQ(ch_args->num_args, 1);
ASSERT_STREQ(ch_args->args[0].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM);
ASSERT_EQ(ch_args->args[0].type, GRPC_ARG_INTEGER);
grpc_channel_args_destroy(ch_args);
}
TEST(CompressionTest, ChannelArgsCompressionAlgorithmStates) {
grpc_core::ExecCtx exec_ctx;
grpc_core::CompressionAlgorithmSet states;
const grpc_channel_args* ch_args =
grpc_channel_args_copy_and_add(nullptr, nullptr, 0);
/* by default, all enabled */
states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(ch_args);
for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
ASSERT_TRUE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
}
/* disable gzip and deflate and stream/gzip */
const grpc_channel_args* ch_args_wo_gzip =
grpc_channel_args_compression_algorithm_set_state(&ch_args,
GRPC_COMPRESS_GZIP, 0);
ASSERT_EQ(ch_args, ch_args_wo_gzip);
const grpc_channel_args* ch_args_wo_gzip_deflate =
grpc_channel_args_compression_algorithm_set_state(
&ch_args_wo_gzip, GRPC_COMPRESS_DEFLATE, 0);
ASSERT_EQ(ch_args_wo_gzip, ch_args_wo_gzip_deflate);
states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(
ch_args_wo_gzip_deflate);
for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
if (i == GRPC_COMPRESS_GZIP || i == GRPC_COMPRESS_DEFLATE) {
ASSERT_FALSE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
} else {
ASSERT_TRUE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
}
}
/* re-enabled gzip only */
ch_args_wo_gzip = grpc_channel_args_compression_algorithm_set_state(
&ch_args_wo_gzip_deflate, GRPC_COMPRESS_GZIP, 1);
ASSERT_EQ(ch_args_wo_gzip, ch_args_wo_gzip_deflate);
states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(ch_args_wo_gzip);
for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) {
if (i == GRPC_COMPRESS_DEFLATE) {
ASSERT_FALSE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
} else {
ASSERT_TRUE(states.IsSet(static_cast<grpc_compression_algorithm>(i)));
}
}
grpc_channel_args_destroy(ch_args);
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);

@ -28,19 +28,12 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/compression/args_utils.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
struct fullstack_compression_fixture_data {
~fullstack_compression_fixture_data() {
grpc_channel_args_destroy(client_args_compression);
grpc_channel_args_destroy(server_args_compression);
}
std::string localaddr;
const grpc_channel_args* client_args_compression = nullptr;
const grpc_channel_args* server_args_compression = nullptr;
};
static grpc_end2end_test_fixture chttp2_create_fixture_fullstack_compression(
@ -63,16 +56,14 @@ void chttp2_init_client_fullstack_compression(
grpc_end2end_test_fixture* f, const grpc_channel_args* client_args) {
fullstack_compression_fixture_data* ffd =
static_cast<fullstack_compression_fixture_data*>(f->fixture_data);
if (ffd->client_args_compression != nullptr) {
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(ffd->client_args_compression);
}
ffd->client_args_compression =
grpc_channel_args_set_channel_default_compression_algorithm(
client_args, GRPC_COMPRESS_GZIP);
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
f->client = grpc_channel_create(ffd->localaddr.c_str(), creds,
ffd->client_args_compression);
f->client = grpc_channel_create(
ffd->localaddr.c_str(), creds,
grpc_core::ChannelArgs::FromC(client_args)
.SetIfUnset(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
GRPC_COMPRESS_GZIP)
.ToC()
.get());
grpc_channel_credentials_release(creds);
}
@ -80,17 +71,16 @@ void chttp2_init_server_fullstack_compression(
grpc_end2end_test_fixture* f, const grpc_channel_args* server_args) {
fullstack_compression_fixture_data* ffd =
static_cast<fullstack_compression_fixture_data*>(f->fixture_data);
if (ffd->server_args_compression != nullptr) {
grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(ffd->server_args_compression);
}
ffd->server_args_compression =
grpc_channel_args_set_channel_default_compression_algorithm(
server_args, GRPC_COMPRESS_GZIP);
if (f->server) {
grpc_server_destroy(f->server);
}
f->server = grpc_server_create(ffd->server_args_compression, nullptr);
f->server = grpc_server_create(
grpc_core::ChannelArgs::FromC(server_args)
.SetIfUnset(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
GRPC_COMPRESS_GZIP)
.ToC()
.get(),
nullptr);
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_credentials* server_creds =
grpc_insecure_server_credentials_create();

@ -17,8 +17,11 @@
*/
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <string>
#include "absl/status/statusor.h"
#include <grpc/byte_buffer.h>
@ -39,6 +42,7 @@
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/event_string.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "test/core/util/mock_endpoint.h"
@ -151,13 +155,23 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
for (int i = 0; i < requested_calls; i++) {
ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
if (ev.type != GRPC_OP_COMPLETE) {
gpr_log(GPR_ERROR,
"[%d/%d requested calls] Unexpected event type (expected "
"COMPLETE): %s",
i, requested_calls, grpc_event_string(&ev).c_str());
abort();
}
}
grpc_completion_queue_shutdown(cq);
for (int i = 0; i < requested_calls; i++) {
ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
if (ev.type != GRPC_QUEUE_SHUTDOWN) {
gpr_log(GPR_ERROR, "Unexpected event type (expected SHUTDOWN): %s",
grpc_event_string(&ev).c_str());
abort();
}
}
grpc_call_unref(call);
grpc_completion_queue_destroy(cq);

@ -450,7 +450,6 @@ def grpc_end2end_tests():
":local_util",
"//test/core/util:test_lb_policies",
"//:grpc_authorization_provider",
"//test/core/compression:args_utils",
"//:grpc_http_filters",
"//src/core:event_log",
],
@ -475,7 +474,6 @@ def grpc_end2end_tests():
"//test/core/util:grpc_test_util",
"//:grpc",
"//:gpr",
"//test/core/compression:args_utils",
"//:grpc_http_filters",
],
tags = _platform_support_tags(fopt) + fopt.tags,

@ -33,9 +33,9 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_test_only.h"
#include "test/core/compression/args_utils.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/end2end/end2end_tests.h"
#include "test/core/util/test_config.h"
@ -109,8 +109,8 @@ static void request_for_disabled_algorithm(
grpc_call* s;
grpc_slice request_payload_slice;
grpc_byte_buffer* request_payload;
const grpc_channel_args* client_args;
const grpc_channel_args* server_args;
grpc_core::ChannelArgs client_args;
grpc_core::ChannelArgs server_args;
grpc_end2end_test_fixture f;
grpc_op ops[6];
grpc_op* op;
@ -130,28 +130,26 @@ static void request_for_disabled_algorithm(
request_payload_slice = grpc_slice_from_copied_string(str);
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
client_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, requested_client_compression_algorithm);
server_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, GRPC_COMPRESS_NONE);
server_args = grpc_channel_args_compression_algorithm_set_state(
&server_args, algorithm_to_disable, false);
client_args =
grpc_core::ChannelArgs().Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
requested_client_compression_algorithm);
server_args =
grpc_core::ChannelArgs()
.Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, GRPC_COMPRESS_NONE)
.Set(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
grpc_core::BitSet<GRPC_COMPRESS_ALGORITHMS_COUNT>()
.SetAll(true)
.Set(algorithm_to_disable, false)
.ToInt<uint32_t>());
if (!decompress_in_core) {
grpc_arg disable_decompression_in_core_arg =
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0);
const grpc_channel_args* old_client_args = client_args;
const grpc_channel_args* old_server_args = server_args;
client_args = grpc_channel_args_copy_and_add(
client_args, &disable_decompression_in_core_arg, 1);
server_args = grpc_channel_args_copy_and_add(
server_args, &disable_decompression_in_core_arg, 1);
grpc_channel_args_destroy(old_client_args);
grpc_channel_args_destroy(old_server_args);
client_args =
client_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
server_args =
server_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
}
f = begin_test(config, test_name, client_args, server_args,
decompress_in_core);
f = begin_test(config, test_name, client_args.ToC().get(),
server_args.ToC().get(), decompress_in_core);
grpc_core::CqVerifier cqv(f.cq);
gpr_timespec deadline = five_seconds_from_now();
@ -266,8 +264,6 @@ static void request_for_disabled_algorithm(
grpc_slice_unref(request_payload_slice);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_channel_args_destroy(client_args);
grpc_channel_args_destroy(server_args);
end_test(&f);
config.tear_down_data(&f);
}
@ -286,8 +282,8 @@ static void request_with_payload_template_inner(
grpc_call* s;
grpc_slice request_payload_slice;
grpc_byte_buffer* request_payload = nullptr;
const grpc_channel_args* client_args;
const grpc_channel_args* server_args;
grpc_core::ChannelArgs client_args;
grpc_core::ChannelArgs server_args;
grpc_end2end_test_fixture f;
grpc_op ops[6];
grpc_op* op;
@ -315,25 +311,20 @@ static void request_with_payload_template_inner(
grpc_slice response_payload_slice =
grpc_slice_from_copied_string(response_str);
client_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, default_client_channel_compression_algorithm);
server_args = grpc_channel_args_set_channel_default_compression_algorithm(
nullptr, default_server_channel_compression_algorithm);
client_args = grpc_core::ChannelArgs().Set(
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
default_client_channel_compression_algorithm);
server_args = grpc_core::ChannelArgs().Set(
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
default_server_channel_compression_algorithm);
if (!decompress_in_core) {
grpc_arg disable_decompression_in_core_arg =
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0);
const grpc_channel_args* old_client_args = client_args;
const grpc_channel_args* old_server_args = server_args;
client_args = grpc_channel_args_copy_and_add(
client_args, &disable_decompression_in_core_arg, 1);
server_args = grpc_channel_args_copy_and_add(
server_args, &disable_decompression_in_core_arg, 1);
grpc_channel_args_destroy(old_client_args);
grpc_channel_args_destroy(old_server_args);
client_args =
client_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
server_args =
server_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false);
}
f = begin_test(config, test_name, client_args, server_args,
decompress_in_core);
f = begin_test(config, test_name, client_args.ToC().get(),
server_args.ToC().get(), decompress_in_core);
grpc_core::CqVerifier cqv(f.cq);
gpr_timespec deadline = five_seconds_from_now();
@ -558,8 +549,6 @@ static void request_with_payload_template_inner(
grpc_call_unref(c);
grpc_call_unref(s);
grpc_channel_args_destroy(client_args);
grpc_channel_args_destroy(server_args);
end_test(&f);
config.tear_down_data(&f);
}

@ -19,6 +19,8 @@
#include <stdint.h>
#include <string.h>
#include <string>
#include "absl/strings/match.h"
#include <grpc/byte_buffer.h>
@ -43,7 +45,10 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
grpc_channel_args* client_args,
grpc_channel_args* server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
gpr_log(GPR_INFO, "\n\n\nRunning test: %s/%s client_args=%s server_args=%s",
test_name, config.name,
grpc_core::ChannelArgs::FromC(client_args).ToString().c_str(),
grpc_core::ChannelArgs::FromC(server_args).ToString().c_str());
// We intentionally do not pass the client and server args to
// create_fixture(), since we don't want the limit enforced on the
// proxy, only on the backend server.
@ -482,11 +487,8 @@ static grpc_metadata gzip_compression_override() {
// Test receive message limit with compressed request larger than the limit
static void test_max_receive_message_length_on_compressed_request(
grpc_end2end_test_config config, bool minimal_stack) {
gpr_log(GPR_INFO,
"test max receive message length on compressed request with "
"minimal_stack=%d",
minimal_stack);
grpc_end2end_test_config config) {
gpr_log(GPR_INFO, "test max receive message length on compressed request");
grpc_end2end_test_fixture f;
grpc_call* c = nullptr;
grpc_call* s = nullptr;
@ -503,17 +505,15 @@ static void test_max_receive_message_length_on_compressed_request(
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details, status_details;
grpc_slice details;
int was_cancelled = 2;
// Set limit via channel args.
grpc_arg arg[2];
grpc_arg arg[1];
arg[0] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5);
arg[1] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MINIMAL_STACK), minimal_stack);
grpc_channel_args* server_args =
grpc_channel_args_copy_and_add(nullptr, arg, 2);
grpc_channel_args_copy_and_add(nullptr, arg, 1);
f = begin_test(config, "test_max_request_message_length", nullptr,
server_args);
@ -592,41 +592,20 @@ static void test_max_receive_message_length_on_compressed_request(
op->flags = 0;
op->reserved = nullptr;
op++;
if (minimal_stack) {
/* Expect the RPC to proceed normally for a minimal stack */
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
}
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(102), minimal_stack);
cqv.Expect(tag(102), false);
cqv.Expect(tag(103), true);
cqv.Expect(tag(1), true);
cqv.Verify();
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
if (minimal_stack) {
/* We do not perform message size checks for minimal stack. */
GPR_ASSERT(status == GRPC_STATUS_OK);
} else {
GPR_ASSERT(was_cancelled == 1);
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details),
"Received message larger than max"));
}
GPR_ASSERT(was_cancelled == 1);
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details),
"Received message larger than max"));
grpc_slice_unref(details);
grpc_slice_unref(request_payload_slice);
grpc_metadata_array_destroy(&initial_metadata_recv);
@ -644,11 +623,9 @@ static void test_max_receive_message_length_on_compressed_request(
// Test receive message limit with compressed response larger than the limit.
static void test_max_receive_message_length_on_compressed_response(
grpc_end2end_test_config config, bool minimal_stack) {
grpc_end2end_test_config config) {
gpr_log(GPR_INFO,
"testing max receive message length on compressed response with "
"minimal_stack=%d",
minimal_stack);
"testing max receive message length on compressed response");
grpc_end2end_test_fixture f;
grpc_call* c = nullptr;
grpc_call* s = nullptr;
@ -669,13 +646,11 @@ static void test_max_receive_message_length_on_compressed_response(
int was_cancelled = 2;
// Set limit via channel args.
grpc_arg arg[2];
grpc_arg arg[1];
arg[0] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5);
arg[1] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_MINIMAL_STACK), minimal_stack);
grpc_channel_args* client_args =
grpc_channel_args_copy_and_add(nullptr, arg, 2);
grpc_channel_args_copy_and_add(nullptr, arg, 1);
f = begin_test(config, "test_max_response_message_length", client_args,
nullptr);
@ -771,14 +746,9 @@ static void test_max_receive_message_length_on_compressed_response(
cqv.Verify();
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
if (minimal_stack) {
/* We do not perform message size checks for minimal stack. */
GPR_ASSERT(status == GRPC_STATUS_OK);
} else {
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details),
"Received message larger than max"));
}
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details),
"Received message larger than max"));
grpc_slice_unref(details);
grpc_slice_unref(response_payload_slice);
grpc_metadata_array_destroy(&initial_metadata_recv);
@ -824,10 +794,8 @@ void max_message_length(grpc_end2end_test_config config) {
* with our simple proxy. */
if (strcmp(config.name, "inproc") != 0 &&
(config.feature_mask & FEATURE_MASK_SUPPORTS_REQUEST_PROXYING) == 0) {
test_max_receive_message_length_on_compressed_request(config, false);
test_max_receive_message_length_on_compressed_request(config, true);
test_max_receive_message_length_on_compressed_response(config, false);
test_max_receive_message_length_on_compressed_response(config, true);
test_max_receive_message_length_on_compressed_request(config);
test_max_receive_message_length_on_compressed_response(config);
}
}

@ -83,8 +83,8 @@ static void shutdown_client(grpc_end2end_test_fixture* f) {
}
static void end_test(grpc_end2end_test_fixture* f) {
shutdown_server(f);
shutdown_client(f);
shutdown_server(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);

@ -35,7 +35,7 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
#include "src/core/ext/filters/http/message_compress/compression_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
@ -581,9 +581,10 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, PhonyFilter, SendEmptyMetadata);
typedef Fixture<&grpc_core::ClientChannel::kFilterVtable, 0>
ClientChannelFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientChannelFilter, NoOp);
typedef Fixture<&grpc_message_compress_filter, CHECKS_NOT_LAST> CompressFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, SendEmptyMetadata);
typedef Fixture<&grpc_core::ClientCompressionFilter::kFilter, CHECKS_NOT_LAST>
ClientCompressFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientCompressFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientCompressFilter, SendEmptyMetadata);
typedef Fixture<&grpc_client_deadline_filter, CHECKS_NOT_LAST>
ClientDeadlineFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientDeadlineFilter, NoOp);
@ -601,9 +602,10 @@ typedef Fixture<&grpc_core::HttpServerFilter::kFilter, CHECKS_NOT_LAST>
HttpServerFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata);
typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata);
typedef Fixture<&grpc_core::ServerCompressionFilter::kFilter, CHECKS_NOT_LAST>
ServerCompressFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerCompressFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerCompressFilter, SendEmptyMetadata);
// This cmake target is disabled for now because it depends on OpenCensus, which
// is Bazel-only.
// typedef Fixture<&grpc_server_load_reporting_filter, CHECKS_NOT_LAST>

@ -1172,10 +1172,8 @@ src/core/ext/filters/http/client/http_client_filter.h \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.h \
src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
src/core/ext/filters/http/message_compress/message_decompress_filter.h \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \

@ -989,10 +989,8 @@ src/core/ext/filters/http/client/http_client_filter.h \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.cc \
src/core/ext/filters/http/message_compress/message_compress_filter.h \
src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
src/core/ext/filters/http/message_compress/message_decompress_filter.h \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \

@ -178,16 +178,18 @@ def diff(bms, loops, regex, track, old, new):
js_old_opt = _read_json(
'%s.%s.opt.%s.%d.json' % (bm, stripped_line, old, loop),
badjson_files, nonexistant_files)
for row in bm_json.expand_json(js_new_opt):
name = row['cpp_name']
if name.endswith('_mean') or name.endswith('_stddev'):
continue
benchmarks[name].add_sample(track, row, True)
for row in bm_json.expand_json(js_old_opt):
name = row['cpp_name']
if name.endswith('_mean') or name.endswith('_stddev'):
continue
benchmarks[name].add_sample(track, row, False)
if js_new_opt:
for row in bm_json.expand_json(js_new_opt):
name = row['cpp_name']
if name.endswith('_mean') or name.endswith('_stddev'):
continue
benchmarks[name].add_sample(track, row, True)
if js_old_opt:
for row in bm_json.expand_json(js_old_opt):
name = row['cpp_name']
if name.endswith('_mean') or name.endswith('_stddev'):
continue
benchmarks[name].add_sample(track, row, False)
really_interesting = set()
for name, bm in benchmarks.items():

@ -209,30 +209,6 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "minimal_stack_is_minimal_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -4695,6 +4671,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "minimal_stack_is_minimal_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save