diff --git a/BUILD b/BUILD index 17a5350705e..d8b99aa87ce 100644 --- a/BUILD +++ b/BUILD @@ -3114,14 +3114,12 @@ grpc_cc_library( srcs = [ "//src/core:ext/filters/http/client/http_client_filter.cc", "//src/core:ext/filters/http/http_filters_plugin.cc", - "//src/core:ext/filters/http/message_compress/message_compress_filter.cc", - "//src/core:ext/filters/http/message_compress/message_decompress_filter.cc", + "//src/core:ext/filters/http/message_compress/compression_filter.cc", "//src/core:ext/filters/http/server/http_server_filter.cc", ], hdrs = [ "//src/core:ext/filters/http/client/http_client_filter.h", - "//src/core:ext/filters/http/message_compress/message_compress_filter.h", - "//src/core:ext/filters/http/message_compress/message_decompress_filter.h", + "//src/core:ext/filters/http/message_compress/compression_filter.h", "//src/core:ext/filters/http/server/http_server_filter.h", ], external_deps = [ @@ -3138,7 +3136,6 @@ grpc_cc_library( deps = [ "channel_stack_builder", "config", - "debug_location", "gpr", "grpc_base", "grpc_public_hdrs", @@ -3151,16 +3148,17 @@ grpc_cc_library( "//src/core:channel_init", "//src/core:channel_stack_type", "//src/core:context", - "//src/core:error", + "//src/core:for_each", "//src/core:grpc_message_size_filter", "//src/core:latch", + "//src/core:map_pipe", "//src/core:percent_encoding", "//src/core:seq", "//src/core:slice", "//src/core:slice_buffer", - "//src/core:status_helper", "//src/core:transport_fwd", "//src/core:try_concurrently", + "//src/core:try_seq", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index ea3d30ae05a..8b72b1dfdd9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -774,7 +774,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_c goaway_server_test) add_dependencies(buildtests_c inproc_callback_test) add_dependencies(buildtests_c invalid_call_argument_test) - add_dependencies(buildtests_c minimal_stack_is_minimal_test) add_dependencies(buildtests_c multiple_server_queues_test) add_dependencies(buildtests_c no_server_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX OR _gRPC_PLATFORM_WINDOWS) @@ -1033,6 +1032,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx message_compress_test) add_dependencies(buildtests_cxx message_size_service_config_test) add_dependencies(buildtests_cxx metadata_map_test) + add_dependencies(buildtests_cxx minimal_stack_is_minimal_test) add_dependencies(buildtests_cxx miscompile_with_no_unique_address_test) add_dependencies(buildtests_cxx mock_stream_test) add_dependencies(buildtests_cxx mock_test) @@ -1354,7 +1354,6 @@ endif() if(gRPC_BUILD_TESTS) add_library(end2end_tests - test/core/compression/args_utils.cc test/core/end2end/cq_verifier.cc test/core/end2end/data/client_certs.cc test/core/end2end/data/server1_cert.cc @@ -1722,8 +1721,7 @@ add_library(grpc src/core/ext/filters/http/client/http_client_filter.cc src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/message_compress_filter.cc - src/core/ext/filters/http/message_compress/message_decompress_filter.cc + src/core/ext/filters/http/message_compress/compression_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/filters/rbac/rbac_filter.cc @@ -2678,8 +2676,7 @@ add_library(grpc_unsecure src/core/ext/filters/http/client/http_client_filter.cc src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/message_compress_filter.cc - src/core/ext/filters/http/message_compress/message_decompress_filter.cc + src/core/ext/filters/http/message_compress/compression_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -5094,35 +5091,6 @@ target_link_libraries(invalid_call_argument_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(minimal_stack_is_minimal_test - test/core/channel/minimal_stack_is_minimal_test.cc -) - -target_include_directories(minimal_stack_is_minimal_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} -) - -target_link_libraries(minimal_stack_is_minimal_test - ${_gRPC_BASELIB_LIBRARIES} - ${_gRPC_ZLIB_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util -) - - endif() if(gRPC_BUILD_TESTS) @@ -8882,7 +8850,6 @@ endif() if(gRPC_BUILD_TESTS) add_executable(compression_test - test/core/compression/args_utils.cc test/core/compression/compression_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -14762,6 +14729,43 @@ target_link_libraries(metadata_map_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(minimal_stack_is_minimal_test + test/core/channel/minimal_stack_is_minimal_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(minimal_stack_is_minimal_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(minimal_stack_is_minimal_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 4f306caa5e2..6e4089ff6f9 100644 --- a/Makefile +++ b/Makefile @@ -1023,8 +1023,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/http/client/http_client_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ - src/core/ext/filters/http/message_compress/message_compress_filter.cc \ - src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ + src/core/ext/filters/http/message_compress/compression_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/rbac/rbac_filter.cc \ @@ -1838,8 +1837,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/http/client/http_client_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ - src/core/ext/filters/http/message_compress/message_compress_filter.cc \ - src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ + src/core/ext/filters/http/message_compress/compression_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/transport/chttp2/client/chttp2_connector.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 48a46cf525d..76123f7fad9 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -17,7 +17,6 @@ libs: language: c public_headers: [] headers: - - test/core/compression/args_utils.h - test/core/end2end/cq_verifier.h - test/core/end2end/data/ssl_test_data.h - test/core/end2end/end2end_tests.h @@ -27,7 +26,6 @@ libs: - test/core/end2end/tests/cancel_test_helpers.h - test/core/util/test_lb_policies.h src: - - test/core/compression/args_utils.cc - test/core/end2end/cq_verifier.cc - test/core/end2end/data/client_certs.cc - test/core/end2end/data/server1_cert.cc @@ -366,8 +364,7 @@ libs: - src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - - src/core/ext/filters/http/message_compress/message_compress_filter.h - - src/core/ext/filters/http/message_compress/message_decompress_filter.h + - src/core/ext/filters/http/message_compress/compression_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/message_size/message_size_filter.h - src/core/ext/filters/rbac/rbac_filter.h @@ -1111,8 +1108,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.cc - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - - src/core/ext/filters/http/message_compress/message_compress_filter.cc - - src/core/ext/filters/http/message_compress/message_decompress_filter.cc + - src/core/ext/filters/http/message_compress/compression_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc - src/core/ext/filters/rbac/rbac_filter.cc @@ -1938,8 +1934,7 @@ libs: - src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - - src/core/ext/filters/http/message_compress/message_compress_filter.h - - src/core/ext/filters/http/message_compress/message_decompress_filter.h + - src/core/ext/filters/http/message_compress/compression_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/message_size/message_size_filter.h - src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -2324,8 +2319,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.cc - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - - src/core/ext/filters/http/message_compress/message_compress_filter.cc - - src/core/ext/filters/http/message_compress/message_decompress_filter.cc + - src/core/ext/filters/http/message_compress/compression_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc - src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -4152,15 +4146,6 @@ targets: - test/core/end2end/invalid_call_argument_test.cc deps: - grpc_test_util -- name: minimal_stack_is_minimal_test - build: test - language: c - headers: [] - src: - - test/core/channel/minimal_stack_is_minimal_test.cc - deps: - - grpc_test_util - uses_polling: false - name: multiple_server_queues_test build: test language: c @@ -5921,10 +5906,8 @@ targets: gtest: true build: test language: c++ - headers: - - test/core/compression/args_utils.h + headers: [] src: - - test/core/compression/args_utils.cc - test/core/compression/compression_test.cc deps: - grpc_test_util @@ -9095,6 +9078,16 @@ targets: - test/core/util/tracer_util.cc deps: - grpc_test_util +- name: minimal_stack_is_minimal_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/channel/minimal_stack_is_minimal_test.cc + deps: + - grpc_test_util + uses_polling: false - name: miscompile_with_no_unique_address_test gtest: true build: test diff --git a/config.m4 b/config.m4 index 2d5ee1613b1..ec070b69f1c 100644 --- a/config.m4 +++ b/config.m4 @@ -105,8 +105,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/http/client/http_client_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ - src/core/ext/filters/http/message_compress/message_compress_filter.cc \ - src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ + src/core/ext/filters/http/message_compress/compression_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/rbac/rbac_filter.cc \ diff --git a/config.w32 b/config.w32 index 952d3c8847e..bd2e4d76f2a 100644 --- a/config.w32 +++ b/config.w32 @@ -71,8 +71,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\http\\client\\http_client_filter.cc " + "src\\core\\ext\\filters\\http\\client_authority_filter.cc " + "src\\core\\ext\\filters\\http\\http_filters_plugin.cc " + - "src\\core\\ext\\filters\\http\\message_compress\\message_compress_filter.cc " + - "src\\core\\ext\\filters\\http\\message_compress\\message_decompress_filter.cc " + + "src\\core\\ext\\filters\\http\\message_compress\\compression_filter.cc " + "src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " + "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " + "src\\core\\ext\\filters\\rbac\\rbac_filter.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 881f0ca3e4f..55b6b4b285d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -275,8 +275,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h', 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', - 'src/core/ext/filters/http/message_compress/message_compress_filter.h', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', + 'src/core/ext/filters/http/message_compress/compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', 'src/core/ext/filters/rbac/rbac_filter.h', @@ -1180,8 +1179,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h', 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', - 'src/core/ext/filters/http/message_compress/message_compress_filter.h', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', + 'src/core/ext/filters/http/message_compress/compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', 'src/core/ext/filters/rbac/rbac_filter.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3febb0d1c27..44a0fb195ea 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -311,10 +311,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/http_filters_plugin.cc', - 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', - 'src/core/ext/filters/http/message_compress/message_compress_filter.h', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', + 'src/core/ext/filters/http/message_compress/compression_filter.cc', + 'src/core/ext/filters/http/message_compress/compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.cc', @@ -1841,8 +1839,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h', 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', - 'src/core/ext/filters/http/message_compress/message_compress_filter.h', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.h', + 'src/core/ext/filters/http/message_compress/compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', 'src/core/ext/filters/rbac/rbac_filter.h', @@ -2641,8 +2638,6 @@ Pod::Spec.new do |s| 'src/core/lib/security/authorization/grpc_authorization_policy_provider.h', 'src/core/lib/security/authorization/rbac_translator.cc', 'src/core/lib/security/authorization/rbac_translator.h', - 'test/core/compression/args_utils.cc', - 'test/core/compression/args_utils.h', 'test/core/end2end/cq_verifier.cc', 'test/core/end2end/cq_verifier.h', 'test/core/end2end/data/client_certs.cc', diff --git a/grpc.gemspec b/grpc.gemspec index d885d13a731..d0d41b550c6 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -222,10 +222,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/http/client_authority_filter.cc ) s.files += %w( src/core/ext/filters/http/client_authority_filter.h ) s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc ) - s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.cc ) - s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.h ) - s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.cc ) - s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.h ) + s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.cc ) + s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.h ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.h ) s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc ) diff --git a/grpc.gyp b/grpc.gyp index bbad5be7ce7..73dabcb0cd7 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -181,7 +181,6 @@ 'grpc_test_util', ], 'sources': [ - 'test/core/compression/args_utils.cc', 'test/core/end2end/cq_verifier.cc', 'test/core/end2end/data/client_certs.cc', 'test/core/end2end/data/server1_cert.cc', @@ -436,8 +435,7 @@ 'src/core/ext/filters/http/client/http_client_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', - 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', + 'src/core/ext/filters/http/message_compress/compression_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/rbac/rbac_filter.cc', @@ -1198,8 +1196,7 @@ 'src/core/ext/filters/http/client/http_client_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', - 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', + 'src/core/ext/filters/http/message_compress/compression_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/transport/chttp2/client/chttp2_connector.cc', diff --git a/package.xml b/package.xml index efd4ec4c053..6c114bbceae 100644 --- a/package.xml +++ b/package.xml @@ -204,10 +204,8 @@ - - - - + + diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index 42964ac5fcc..3cfbaae2da2 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -25,8 +25,7 @@ #include #include "src/core/ext/filters/http/client/http_client_filter.h" -#include "src/core/ext/filters/http/message_compress/message_compress_filter.h" -#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h" +#include "src/core/ext/filters/http/message_compress/compression_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" @@ -45,25 +44,24 @@ static bool is_building_http_like_transport( namespace grpc_core { void RegisterHttpFilters(CoreConfiguration::Builder* builder) { - auto optional = [builder](grpc_channel_stack_type channel_type, - bool enable_in_minimal_stack, - const char* control_channel_arg, - const grpc_channel_filter* filter) { + auto compression = [builder](grpc_channel_stack_type channel_type, + const grpc_channel_filter* filter) { builder->channel_init()->RegisterStage( channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - [enable_in_minimal_stack, control_channel_arg, - filter](ChannelStackBuilder* builder) { + [filter](ChannelStackBuilder* builder) { if (!is_building_http_like_transport(builder)) return true; auto args = builder->channel_args(); - const bool enable = args.GetBool(control_channel_arg) - .value_or(enable_in_minimal_stack || - !args.WantMinimalStack()); + const bool enable = + args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION) + .value_or(true) || + args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION) + .value_or(true); if (enable) builder->PrependFilter(filter); return true; }); }; - auto required = [builder](grpc_channel_stack_type channel_type, - const grpc_channel_filter* filter) { + auto http = [builder](grpc_channel_stack_type channel_type, + const grpc_channel_filter* filter) { builder->channel_init()->RegisterStage( channel_type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, [filter](ChannelStackBuilder* builder) { @@ -73,25 +71,11 @@ void RegisterHttpFilters(CoreConfiguration::Builder* builder) { return true; }); }; - // TODO(ctiller): return this flag to true once the promise conversion is - // complete. - static constexpr bool kMinimalStackHasDecompression = false; - optional(GRPC_CLIENT_SUBCHANNEL, false, - GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION, - &grpc_message_compress_filter); - optional(GRPC_CLIENT_DIRECT_CHANNEL, false, - GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION, - &grpc_message_compress_filter); - optional(GRPC_SERVER_CHANNEL, false, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION, - &grpc_message_compress_filter); - optional(GRPC_CLIENT_SUBCHANNEL, kMinimalStackHasDecompression, - GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter); - optional(GRPC_CLIENT_DIRECT_CHANNEL, kMinimalStackHasDecompression, - GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter); - optional(GRPC_SERVER_CHANNEL, kMinimalStackHasDecompression, - GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, &MessageDecompressFilter); - required(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter); - required(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter); - required(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter); + compression(GRPC_CLIENT_SUBCHANNEL, &ClientCompressionFilter::kFilter); + compression(GRPC_CLIENT_DIRECT_CHANNEL, &ClientCompressionFilter::kFilter); + compression(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter); + http(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter); + http(GRPC_CLIENT_DIRECT_CHANNEL, &HttpClientFilter::kFilter); + http(GRPC_SERVER_CHANNEL, &HttpServerFilter::kFilter); } } // namespace grpc_core diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc new file mode 100644 index 00000000000..07ce15e9d42 --- /dev/null +++ b/src/core/ext/filters/http/message_compress/compression_filter.cc @@ -0,0 +1,315 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/ext/filters/http/message_compress/compression_filter.h" + +#include + +#include +#include +#include +#include + +#include "absl/meta/type_traits.h" +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" +#include "absl/types/optional.h" + +#include +#include +#include +#include + +#include "src/core/ext/filters/message_size/message_size_filter.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/context.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/promise/context.h" +#include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/latch.h" +#include "src/core/lib/promise/map_pipe.h" +#include "src/core/lib/promise/promise.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/try_concurrently.h" +#include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport.h" + +namespace grpc_core { + +const grpc_channel_filter ClientCompressionFilter::kFilter = + MakePromiseBasedFilter("compression"); +const grpc_channel_filter ServerCompressionFilter::kFilter = + MakePromiseBasedFilter("compression"); + +absl::StatusOr ClientCompressionFilter::Create( + const ChannelArgs& args, ChannelFilter::Args) { + return ClientCompressionFilter(args); +} + +absl::StatusOr ServerCompressionFilter::Create( + const ChannelArgs& args, ChannelFilter::Args) { + return ServerCompressionFilter(args); +} + +CompressionFilter::CompressionFilter(const ChannelArgs& args) + : max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)), + message_size_service_config_parser_index_( + MessageSizeParser::ParserIndex()), + default_compression_algorithm_( + DefaultCompressionAlgorithmFromChannelArgs(args).value_or( + GRPC_COMPRESS_NONE)), + enabled_compression_algorithms_( + CompressionAlgorithmSet::FromChannelArgs(args)), + enable_compression_( + args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION).value_or(true)), + enable_decompression_( + args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION) + .value_or(true)) { + // Make sure the default is enabled. + if (!enabled_compression_algorithms_.IsSet(default_compression_algorithm_)) { + const char* name; + if (!grpc_compression_algorithm_name(default_compression_algorithm_, + &name)) { + name = ""; + } + gpr_log(GPR_ERROR, + "default compression algorithm %s not enabled: switching to none", + name); + default_compression_algorithm_ = GRPC_COMPRESS_NONE; + } +} + +MessageHandle CompressionFilter::CompressMessage( + MessageHandle message, grpc_compression_algorithm algorithm) const { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + gpr_log(GPR_ERROR, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d", + message->payload()->Length(), algorithm, message->flags()); + } + // Check if we're allowed to compress this message + // (apps might want to disable compression for certain messages to avoid + // crime/beast like vulns). + uint32_t& flags = message->mutable_flags(); + if (algorithm == GRPC_COMPRESS_NONE || !enable_compression_ || + (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS))) { + return message; + } + // Try to compress the payload. + SliceBuffer tmp; + SliceBuffer* payload = message->payload(); + bool did_compress = grpc_msg_compress(algorithm, payload->c_slice_buffer(), + tmp.c_slice_buffer()); + // If we achieved compression send it as compressed, otherwise send it as (to + // avoid spending cycles on the receiver decompressing). + if (did_compress) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + const char* algo_name; + const size_t before_size = payload->Length(); + const size_t after_size = tmp.Length(); + const float savings_ratio = 1.0f - static_cast(after_size) / + static_cast(before_size); + GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name)); + gpr_log(GPR_INFO, + "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR + " bytes (%.2f%% savings)", + algo_name, before_size, after_size, 100 * savings_ratio); + } + tmp.Swap(payload); + flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + const char* algo_name; + GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name)); + gpr_log(GPR_INFO, + "Algorithm '%s' enabled but decided not to compress. Input size: " + "%" PRIuPTR, + algo_name, payload->Length()); + } + } + return message; +} + +absl::StatusOr CompressionFilter::DecompressMessage( + MessageHandle message, grpc_compression_algorithm algorithm, + absl::optional max_recv_message_length) const { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + gpr_log(GPR_ERROR, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d", + message->payload()->Length(), max_recv_message_length.value_or(-1), + algorithm); + } + // Check max message length. + if (max_recv_message_length.has_value() && + message->payload()->Length() > + static_cast(*max_recv_message_length)) { + return absl::ResourceExhaustedError(absl::StrFormat( + "Received message larger than max (%u vs. %d)", + message->payload()->Length(), *max_recv_message_length)); + } + // Check if decompression is enabled (if not, we can just pass the message + // up). + if (!enable_decompression_ || + (message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == 0) { + return std::move(message); + } + // Try to decompress the payload. + SliceBuffer decompressed_slices; + if (grpc_msg_decompress(algorithm, message->payload()->c_slice_buffer(), + decompressed_slices.c_slice_buffer()) == 0) { + return absl::InternalError( + absl::StrCat("Unexpected error decompressing data for algorithm ", + CompressionAlgorithmAsString(algorithm))); + } + // Swap the decompressed slices into the message. + message->payload()->Swap(&decompressed_slices); + message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS; + message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; + return std::move(message); +} + +class CompressionFilter::DecompressLoop { + public: + explicit DecompressLoop(CompressionFilter* filter, CallArgs& call_args) + : filter_(filter), + mapper_(PipeMapper::Intercept( + *call_args.incoming_messages)) {} + + // Once we have a compression algorithm we can construct the decompression + // loop. + // Returns a promise that resolves to MessageHandle. + auto TakeAndRun(grpc_compression_algorithm algorithm) { + // Configure max receive size. + auto max_recv_message_length = filter_->max_recv_size_; + const MessageSizeParsedConfig* limits = + MessageSizeParsedConfig::GetFromCallContext( + GetContext(), + filter_->message_size_service_config_parser_index_); + if (limits != nullptr && limits->max_recv_size().has_value() && + (!max_recv_message_length.has_value() || + *limits->max_recv_size() < *max_recv_message_length)) { + max_recv_message_length = *limits->max_recv_size(); + } + // Interject decompression into the message loop. + return mapper_.TakeAndRun([algorithm, max_recv_message_length, + filter = filter_](MessageHandle message) { + return filter->DecompressMessage(std::move(message), algorithm, + max_recv_message_length); + }); + } + + private: + CompressionFilter* filter_; + PipeMapper mapper_; +}; + +class CompressionFilter::CompressLoop { + public: + explicit CompressLoop(CompressionFilter* filter, CallArgs& call_args) + : filter_(filter), + mapper_(PipeMapper::Intercept( + *call_args.outgoing_messages)) {} + + // Once we're ready to send initial metadata we can construct the compression + // loop. + // Returns a promise that resolves to MessageHandle. + auto TakeAndRun(grpc_metadata_batch& outgoing_metadata) { + const auto algorithm = + outgoing_metadata.Take(GrpcInternalEncodingRequest()) + .value_or(filter_->default_compression_algorithm()); + // Convey supported compression algorithms. + outgoing_metadata.Set(GrpcAcceptEncodingMetadata(), + filter_->enabled_compression_algorithms()); + if (algorithm != GRPC_COMPRESS_NONE) { + outgoing_metadata.Set(GrpcEncodingMetadata(), algorithm); + } + // Interject compression into the message loop. + return mapper_.TakeAndRun([filter = filter_, algorithm](MessageHandle m) { + return filter->CompressMessage(std::move(m), algorithm); + }); + } + + private: + CompressionFilter* filter_; + PipeMapper mapper_; +}; + +ArenaPromise ClientCompressionFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + auto compress_loop = CompressLoop(this, call_args) + .TakeAndRun(*call_args.client_initial_metadata); + DecompressLoop decompress_loop(this, call_args); + auto* server_initial_metadata = call_args.server_initial_metadata; + // Concurrently: + // - call the next filter + // - wait for initial metadata from the server and then commence decompression + // - compress outgoing messages + return TryConcurrently(next_promise_factory(std::move(call_args))) + .NecessaryPull(Seq(server_initial_metadata->Wait(), + [decompress_loop = std::move(decompress_loop)]( + ServerMetadata** server_initial_metadata) mutable + -> ArenaPromise { + if (*server_initial_metadata == nullptr) { + return ImmediateOkStatus(); + } + return decompress_loop.TakeAndRun( + (*server_initial_metadata) + ->get(GrpcEncodingMetadata()) + .value_or(GRPC_COMPRESS_NONE)); + })) + .Push(std::move(compress_loop)); +} + +ArenaPromise ServerCompressionFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + CompressLoop compress_loop(this, call_args); + auto decompress_loop = DecompressLoop(this, call_args) + .TakeAndRun(call_args.client_initial_metadata + ->get(GrpcEncodingMetadata()) + .value_or(GRPC_COMPRESS_NONE)); + auto* read_latch = GetContext()->New>(); + auto* write_latch = + std::exchange(call_args.server_initial_metadata, read_latch); + // Concurrently: + // - call the next filter + // - decompress incoming messages + // - wait for initial metadata to be sent, and then commence compression of + // outgoing messages + return TryConcurrently(next_promise_factory(std::move(call_args))) + .Pull(std::move(decompress_loop)) + .Push(Seq(read_latch->Wait(), + [write_latch, compress_loop = std::move(compress_loop)]( + ServerMetadata** md) mutable { + // Find the compression algorithm. + auto loop = compress_loop.TakeAndRun(**md); + write_latch->Set(*md); + return loop; + })); +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/http/message_compress/compression_filter.h b/src/core/ext/filters/http/message_compress/compression_filter.h new file mode 100644 index 00000000000..df88736b81b --- /dev/null +++ b/src/core/ext/filters/http/message_compress/compression_filter.h @@ -0,0 +1,133 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_COMPRESSION_FILTER_H +#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_COMPRESSION_FILTER_H + +#include + +#include +#include + +#include "absl/status/statusor.h" +#include "absl/types/optional.h" + +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_fwd.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/promise/arena_promise.h" +#include "src/core/lib/transport/transport.h" + +namespace grpc_core { + +/** Compression filter for messages. + * + * See for the available compression settings. + * + * Compression settings may come from: + * - Channel configuration, as established at channel creation time. + * - The metadata accompanying the outgoing data to be compressed. This is + * taken as a request only. We may choose not to honor it. The metadata key + * is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY. + * + * Compression can be disabled for concrete messages (for instance in order to + * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in + * the MessageHandle flags. + * + * The attempted compression mechanism is added to the resulting initial + * metadata under the 'grpc-encoding' key. + * + * If compression is actually performed, the MessageHandle's flag is modified to + * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the + * aforementioned 'grpc-encoding' metadata value, data will pass through + * uncompressed. */ + +class CompressionFilter : public ChannelFilter { + protected: + explicit CompressionFilter(const ChannelArgs& args); + + class CompressLoop; + class DecompressLoop; + + grpc_compression_algorithm default_compression_algorithm() const { + return default_compression_algorithm_; + } + + CompressionAlgorithmSet enabled_compression_algorithms() const { + return enabled_compression_algorithms_; + } + + private: + // Compress one message synchronously. + MessageHandle CompressMessage(MessageHandle message, + grpc_compression_algorithm algorithm) const; + // Decompress one message synchronously. + absl::StatusOr DecompressMessage( + MessageHandle message, grpc_compression_algorithm algorithm, + absl::optional max_recv_message_length) const; + + // Max receive message length, if set. + absl::optional max_recv_size_; + size_t message_size_service_config_parser_index_; + // The default, channel-level, compression algorithm. + grpc_compression_algorithm default_compression_algorithm_; + // Enabled compression algorithms. + CompressionAlgorithmSet enabled_compression_algorithms_; + // Is compression enabled? + bool enable_compression_; + // Is decompression enabled? + bool enable_decompression_; +}; + +class ClientCompressionFilter final : public CompressionFilter { + public: + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + const ChannelArgs& args, ChannelFilter::Args filter_args); + + // Construct a promise for one call. + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; + + private: + using CompressionFilter::CompressionFilter; +}; + +class ServerCompressionFilter final : public CompressionFilter { + public: + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + const ChannelArgs& args, ChannelFilter::Args filter_args); + + // Construct a promise for one call. + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; + + private: + using CompressionFilter::CompressionFilter; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_COMPRESSION_FILTER_H \ + */ diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc deleted file mode 100644 index 6cf2dcf522e..00000000000 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include "src/core/ext/filters/http/message_compress/message_compress_filter.h" - -#include -#include - -#include -#include - -#include "absl/meta/type_traits.h" -#include "absl/status/status.h" -#include "absl/types/optional.h" - -#include -#include -#include -#include - -#include "src/core/lib/compression/compression_internal.h" -#include "src/core/lib/compression/message_compress.h" -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/iomgr/call_combiner.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/slice/slice_buffer.h" -#include "src/core/lib/surface/call.h" -#include "src/core/lib/transport/metadata_batch.h" -#include "src/core/lib/transport/transport.h" - -namespace { - -class ChannelData { - public: - explicit ChannelData(grpc_channel_element_args* args) { - // Get the enabled and the default algorithms from channel args. - enabled_compression_algorithms_ = - grpc_core::CompressionAlgorithmSet::FromChannelArgs(args->channel_args); - default_compression_algorithm_ = - grpc_core::DefaultCompressionAlgorithmFromChannelArgs( - args->channel_args) - .value_or(GRPC_COMPRESS_NONE); - // Make sure the default is enabled. - if (!enabled_compression_algorithms_.IsSet( - default_compression_algorithm_)) { - const char* name; - if (!grpc_compression_algorithm_name(default_compression_algorithm_, - &name)) { - name = ""; - } - gpr_log(GPR_ERROR, - "default compression algorithm %s not enabled: switching to none", - name); - default_compression_algorithm_ = GRPC_COMPRESS_NONE; - } - GPR_ASSERT(!args->is_last); - } - - grpc_compression_algorithm default_compression_algorithm() const { - return default_compression_algorithm_; - } - - grpc_core::CompressionAlgorithmSet enabled_compression_algorithms() const { - return enabled_compression_algorithms_; - } - - private: - /** The default, channel-level, compression algorithm */ - grpc_compression_algorithm default_compression_algorithm_; - /** Enabled compression algorithms */ - grpc_core::CompressionAlgorithmSet enabled_compression_algorithms_; -}; - -class CallData { - public: - CallData(grpc_call_element* elem, const grpc_call_element_args& args) - : call_combiner_(args.call_combiner) { - ChannelData* channeld = static_cast(elem->channel_data); - // The call's message compression algorithm is set to channel's default - // setting. It can be overridden later by initial metadata. - if (GPR_LIKELY(channeld->enabled_compression_algorithms().IsSet( - channeld->default_compression_algorithm()))) { - compression_algorithm_ = channeld->default_compression_algorithm(); - } - GRPC_CLOSURE_INIT(&forward_send_message_batch_in_call_combiner_, - ForwardSendMessageBatch, elem, grpc_schedule_on_exec_ctx); - } - - ~CallData() {} - - void CompressStartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch); - - private: - bool SkipMessageCompression(); - void FinishSendMessage(grpc_call_element* elem); - - void ProcessSendInitialMetadata(grpc_call_element* elem, - grpc_metadata_batch* initial_metadata); - - // Methods for processing a send_message batch - static void FailSendMessageBatchInCallCombiner(void* calld_arg, - grpc_error_handle error); - static void ForwardSendMessageBatch(void* elem_arg, grpc_error_handle unused); - - grpc_core::CallCombiner* call_combiner_; - grpc_compression_algorithm compression_algorithm_ = GRPC_COMPRESS_NONE; - grpc_error_handle cancel_error_; - grpc_transport_stream_op_batch* send_message_batch_ = nullptr; - bool seen_initial_metadata_ = false; - grpc_closure forward_send_message_batch_in_call_combiner_; -}; - -// Returns true if we should skip message compression for the current message. -bool CallData::SkipMessageCompression() { - // If the flags of this message indicate that it shouldn't be compressed, we - // skip message compression. - uint32_t flags = send_message_batch_->payload->send_message.flags; - if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { - return true; - } - // If this call doesn't have any message compression algorithm set, skip - // message compression. - return compression_algorithm_ == GRPC_COMPRESS_NONE; -} - -void CallData::ProcessSendInitialMetadata( - grpc_call_element* elem, grpc_metadata_batch* initial_metadata) { - ChannelData* channeld = static_cast(elem->channel_data); - // Find the compression algorithm. - compression_algorithm_ = - initial_metadata->Take(grpc_core::GrpcInternalEncodingRequest()) - .value_or(channeld->default_compression_algorithm()); - switch (compression_algorithm_) { - case GRPC_COMPRESS_NONE: - break; - case GRPC_COMPRESS_DEFLATE: - case GRPC_COMPRESS_GZIP: - initial_metadata->Set(grpc_core::GrpcEncodingMetadata(), - compression_algorithm_); - break; - case GRPC_COMPRESS_ALGORITHMS_COUNT: - abort(); - } - // Convey supported compression algorithms. - initial_metadata->Set(grpc_core::GrpcAcceptEncodingMetadata(), - channeld->enabled_compression_algorithms()); -} - -void CallData::FinishSendMessage(grpc_call_element* elem) { - // Compress the data if appropriate. - if (!SkipMessageCompression()) { - grpc_core::SliceBuffer tmp; - uint32_t& send_flags = send_message_batch_->payload->send_message.flags; - grpc_core::SliceBuffer* payload = - send_message_batch_->payload->send_message.send_message; - bool did_compress = - grpc_msg_compress(compression_algorithm_, payload->c_slice_buffer(), - tmp.c_slice_buffer()); - if (did_compress) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { - const char* algo_name; - const size_t before_size = payload->Length(); - const size_t after_size = tmp.Length(); - const float savings_ratio = 1.0f - static_cast(after_size) / - static_cast(before_size); - GPR_ASSERT(grpc_compression_algorithm_name(compression_algorithm_, - &algo_name)); - gpr_log(GPR_INFO, - "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR - " bytes (%.2f%% savings)", - algo_name, before_size, after_size, 100 * savings_ratio); - } - tmp.Swap(payload); - send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; - } else { - if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { - const char* algo_name; - GPR_ASSERT(grpc_compression_algorithm_name(compression_algorithm_, - &algo_name)); - gpr_log( - GPR_INFO, - "Algorithm '%s' enabled but decided not to compress. Input size: " - "%" PRIuPTR, - algo_name, payload->Length()); - } - } - } - grpc_call_next_op(elem, std::exchange(send_message_batch_, nullptr)); -} - -void CallData::FailSendMessageBatchInCallCombiner(void* calld_arg, - grpc_error_handle error) { - CallData* calld = static_cast(calld_arg); - if (calld->send_message_batch_ != nullptr) { - grpc_transport_stream_op_batch_finish_with_failure( - calld->send_message_batch_, error, calld->call_combiner_); - calld->send_message_batch_ = nullptr; - } -} - -void CallData::ForwardSendMessageBatch(void* elem_arg, - grpc_error_handle /*unused*/) { - grpc_call_element* elem = static_cast(elem_arg); - CallData* calld = static_cast(elem->call_data); - calld->FinishSendMessage(elem); -} - -void CallData::CompressStartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - // Handle cancel_stream. - if (batch->cancel_stream) { - cancel_error_ = batch->payload->cancel_stream.cancel_error; - if (send_message_batch_ != nullptr) { - if (!seen_initial_metadata_) { - GRPC_CALL_COMBINER_START( - call_combiner_, - GRPC_CLOSURE_CREATE(FailSendMessageBatchInCallCombiner, this, - grpc_schedule_on_exec_ctx), - cancel_error_, "failing send_message op"); - } - } - } else if (!cancel_error_.ok()) { - grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_, - call_combiner_); - return; - } - // Handle send_initial_metadata. - if (batch->send_initial_metadata) { - GPR_ASSERT(!seen_initial_metadata_); - ProcessSendInitialMetadata( - elem, batch->payload->send_initial_metadata.send_initial_metadata); - seen_initial_metadata_ = true; - // If we had previously received a batch containing a send_message op, - // handle it now. Note that we need to re-enter the call combiner - // for this, since we can't send two batches down while holding the - // call combiner, since the connected_channel filter (at the bottom of - // the call stack) will release the call combiner for each batch it sees. - if (send_message_batch_ != nullptr) { - GRPC_CALL_COMBINER_START( - call_combiner_, &forward_send_message_batch_in_call_combiner_, - absl::OkStatus(), - "starting send_message after send_initial_metadata"); - } - } - // Handle send_message. - if (batch->send_message) { - GPR_ASSERT(send_message_batch_ == nullptr); - send_message_batch_ = batch; - // If we have not yet seen send_initial_metadata, then we have to - // wait. We save the batch and then drop the call combiner, which we'll - // have to pick up again later when we get send_initial_metadata. - if (!seen_initial_metadata_) { - GRPC_CALL_COMBINER_STOP( - call_combiner_, "send_message batch pending send_initial_metadata"); - return; - } - FinishSendMessage(elem); - } else { - // Pass control down the stack. - grpc_call_next_op(elem, batch); - } -} - -void CompressStartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - CallData* calld = static_cast(elem->call_data); - calld->CompressStartTransportStreamOpBatch(elem, batch); -} - -/* Constructor for call_data */ -grpc_error_handle CompressInitCallElem(grpc_call_element* elem, - const grpc_call_element_args* args) { - new (elem->call_data) CallData(elem, *args); - return absl::OkStatus(); -} - -/* Destructor for call_data */ -void CompressDestroyCallElem(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { - CallData* calld = static_cast(elem->call_data); - calld->~CallData(); -} - -/* Constructor for ChannelData */ -grpc_error_handle CompressInitChannelElem(grpc_channel_element* elem, - grpc_channel_element_args* args) { - new (elem->channel_data) ChannelData(args); - return absl::OkStatus(); -} - -/* Destructor for channel data */ -void CompressDestroyChannelElem(grpc_channel_element* elem) { - ChannelData* channeld = static_cast(elem->channel_data); - channeld->~ChannelData(); -} - -} // namespace - -const grpc_channel_filter grpc_message_compress_filter = { - CompressStartTransportStreamOpBatch, - nullptr, - grpc_channel_next_op, - sizeof(CallData), - CompressInitCallElem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - CompressDestroyCallElem, - sizeof(ChannelData), - CompressInitChannelElem, - grpc_channel_stack_no_post_init, - CompressDestroyChannelElem, - grpc_channel_next_get_info, - "message_compress"}; diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.h b/src/core/ext/filters/http/message_compress/message_compress_filter.h deleted file mode 100644 index 4593a276af5..00000000000 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H -#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H - -#include - -#include "src/core/lib/channel/channel_fwd.h" -#include "src/core/lib/channel/channel_stack.h" - -/** Compression filter for outgoing data. - * - * See for the available compression settings. - * - * Compression settings may come from: - * - Channel configuration, as established at channel creation time. - * - The metadata accompanying the outgoing data to be compressed. This is - * taken as a request only. We may choose not to honor it. The metadata key - * is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY. - * - * Compression can be disabled for concrete messages (for instance in order to - * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in - * the BEGIN_MESSAGE flags. - * - * The attempted compression mechanism is added to the resulting initial - * metadata under the'grpc-encoding' key. - * - * If compression is actually performed, BEGIN_MESSAGE's flag is modified to - * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the - * aforementioned 'grpc-encoding' metadata value, data will pass through - * uncompressed. */ - -extern const grpc_channel_filter grpc_message_compress_filter; - -#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H \ - */ diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc b/src/core/ext/filters/http/message_compress/message_decompress_filter.cc deleted file mode 100644 index 73a37b4d41f..00000000000 --- a/src/core/ext/filters/http/message_compress/message_decompress_filter.cc +++ /dev/null @@ -1,322 +0,0 @@ -// -// -// Copyright 2020 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// - -#include - -#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h" - -#include - -#include -#include - -#include "absl/status/status.h" -#include "absl/strings/str_cat.h" -#include "absl/strings/str_format.h" -#include "absl/types/optional.h" - -#include -#include -#include - -#include "src/core/ext/filters/message_size/message_size_filter.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/compression/message_compress.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/status_helper.h" -#include "src/core/lib/iomgr/call_combiner.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/slice/slice_buffer.h" -#include "src/core/lib/transport/metadata_batch.h" -#include "src/core/lib/transport/transport.h" - -namespace grpc_core { -namespace { - -class ChannelData { - public: - explicit ChannelData(const grpc_channel_element_args* args) - : max_recv_size_(GetMaxRecvSizeFromChannelArgs( - ChannelArgs::FromC(args->channel_args))), - message_size_service_config_parser_index_( - MessageSizeParser::ParserIndex()) {} - - absl::optional max_recv_size() const { return max_recv_size_; } - size_t message_size_service_config_parser_index() const { - return message_size_service_config_parser_index_; - } - - private: - absl::optional max_recv_size_; - const size_t message_size_service_config_parser_index_; -}; - -class CallData { - public: - CallData(const grpc_call_element_args& args, const ChannelData* chand) - : call_combiner_(args.call_combiner), - max_recv_message_length_(chand->max_recv_size()) { - // Initialize state for recv_initial_metadata_ready callback - GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_, - OnRecvInitialMetadataReady, this, - grpc_schedule_on_exec_ctx); - // Initialize state for recv_message_ready callback - GRPC_CLOSURE_INIT(&on_recv_message_ready_, OnRecvMessageReady, this, - grpc_schedule_on_exec_ctx); - // Initialize state for recv_trailing_metadata_ready callback - GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_, - OnRecvTrailingMetadataReady, this, - grpc_schedule_on_exec_ctx); - const MessageSizeParsedConfig* limits = - MessageSizeParsedConfig::GetFromCallContext( - args.context, chand->message_size_service_config_parser_index()); - if (limits != nullptr && limits->max_recv_size().has_value() && - (!max_recv_message_length_.has_value() || - *limits->max_recv_size() < *max_recv_message_length_)) { - max_recv_message_length_ = *limits->max_recv_size(); - } - } - - void DecompressStartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch); - - private: - static void OnRecvInitialMetadataReady(void* arg, grpc_error_handle error); - - // Methods for processing a receive message event - void MaybeResumeOnRecvMessageReady(); - static void OnRecvMessageReady(void* arg, grpc_error_handle error); - void ContinueRecvMessageReadyCallback(grpc_error_handle error); - - // Methods for processing a recv_trailing_metadata event - void MaybeResumeOnRecvTrailingMetadataReady(); - static void OnRecvTrailingMetadataReady(void* arg, grpc_error_handle error); - - CallCombiner* call_combiner_; - // Overall error for the call - grpc_error_handle error_; - // Fields for handling recv_initial_metadata_ready callback - grpc_closure on_recv_initial_metadata_ready_; - grpc_closure* original_recv_initial_metadata_ready_ = nullptr; - grpc_metadata_batch* recv_initial_metadata_ = nullptr; - // Fields for handling recv_message_ready callback - bool seen_recv_message_ready_ = false; - absl::optional max_recv_message_length_; - grpc_compression_algorithm algorithm_ = GRPC_COMPRESS_NONE; - absl::optional* recv_message_ = nullptr; - uint32_t* recv_message_flags_ = nullptr; - grpc_closure on_recv_message_ready_; - grpc_closure* original_recv_message_ready_ = nullptr; - // Fields for handling recv_trailing_metadata_ready callback - bool seen_recv_trailing_metadata_ready_ = false; - grpc_closure on_recv_trailing_metadata_ready_; - grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; - grpc_error_handle on_recv_trailing_metadata_ready_error_; -}; - -void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error_handle error) { - CallData* calld = static_cast(arg); - if (error.ok()) { - calld->algorithm_ = - calld->recv_initial_metadata_->get(GrpcEncodingMetadata()) - .value_or(GRPC_COMPRESS_NONE); - } - calld->MaybeResumeOnRecvMessageReady(); - calld->MaybeResumeOnRecvTrailingMetadataReady(); - grpc_closure* closure = calld->original_recv_initial_metadata_ready_; - calld->original_recv_initial_metadata_ready_ = nullptr; - Closure::Run(DEBUG_LOCATION, closure, error); -} - -void CallData::MaybeResumeOnRecvMessageReady() { - if (seen_recv_message_ready_) { - seen_recv_message_ready_ = false; - GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_message_ready_, - absl::OkStatus(), - "continue recv_message_ready callback"); - } -} - -void CallData::OnRecvMessageReady(void* arg, grpc_error_handle error) { - CallData* calld = static_cast(arg); - if (error.ok()) { - if (calld->original_recv_initial_metadata_ready_ != nullptr) { - calld->seen_recv_message_ready_ = true; - GRPC_CALL_COMBINER_STOP(calld->call_combiner_, - "Deferring OnRecvMessageReady until after " - "OnRecvInitialMetadataReady"); - return; - } - if (calld->algorithm_ != GRPC_COMPRESS_NONE) { - // recv_message can be NULL if trailing metadata is received instead of - // message, or it's possible that the message was not compressed. - if (!calld->recv_message_->has_value() || - (*calld->recv_message_)->Length() == 0 || - ((*calld->recv_message_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) == 0)) { - return calld->ContinueRecvMessageReadyCallback(absl::OkStatus()); - } - if (calld->max_recv_message_length_.has_value() && - (*calld->recv_message_)->Length() > - static_cast(*calld->max_recv_message_length_)) { - GPR_DEBUG_ASSERT(calld->error_.ok()); - calld->error_ = grpc_error_set_int( - GRPC_ERROR_CREATE( - absl::StrFormat("Received message larger than max (%u vs. %d)", - (*calld->recv_message_)->Length(), - *calld->max_recv_message_length_)), - StatusIntProperty::kRpcStatus, GRPC_STATUS_RESOURCE_EXHAUSTED); - return calld->ContinueRecvMessageReadyCallback(calld->error_); - } - SliceBuffer decompressed_slices; - if (grpc_msg_decompress(calld->algorithm_, - (*calld->recv_message_)->c_slice_buffer(), - decompressed_slices.c_slice_buffer()) == 0) { - GPR_DEBUG_ASSERT(calld->error_.ok()); - calld->error_ = GRPC_ERROR_CREATE(absl::StrCat( - "Unexpected error decompressing data for algorithm with " - "enum value ", - calld->algorithm_)); - } else { - *calld->recv_message_flags_ = - (*calld->recv_message_flags_ & (~GRPC_WRITE_INTERNAL_COMPRESS)) | - GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; - (*calld->recv_message_)->Swap(&decompressed_slices); - } - return calld->ContinueRecvMessageReadyCallback(calld->error_); - } - } - calld->ContinueRecvMessageReadyCallback(error); -} - -void CallData::ContinueRecvMessageReadyCallback(grpc_error_handle error) { - MaybeResumeOnRecvTrailingMetadataReady(); - // The surface will clean up the receiving stream if there is an error. - grpc_closure* closure = original_recv_message_ready_; - original_recv_message_ready_ = nullptr; - Closure::Run(DEBUG_LOCATION, closure, error); -} - -void CallData::MaybeResumeOnRecvTrailingMetadataReady() { - if (seen_recv_trailing_metadata_ready_) { - seen_recv_trailing_metadata_ready_ = false; - grpc_error_handle error = on_recv_trailing_metadata_ready_error_; - on_recv_trailing_metadata_ready_error_ = absl::OkStatus(); - GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_trailing_metadata_ready_, - error, "Continuing OnRecvTrailingMetadataReady"); - } -} - -void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error_handle error) { - CallData* calld = static_cast(arg); - if (calld->original_recv_initial_metadata_ready_ != nullptr || - calld->original_recv_message_ready_ != nullptr) { - calld->seen_recv_trailing_metadata_ready_ = true; - calld->on_recv_trailing_metadata_ready_error_ = error; - GRPC_CALL_COMBINER_STOP( - calld->call_combiner_, - "Deferring OnRecvTrailingMetadataReady until after " - "OnRecvInitialMetadataReady and OnRecvMessageReady"); - return; - } - error = grpc_error_add_child(error, calld->error_); - calld->error_ = absl::OkStatus(); - grpc_closure* closure = calld->original_recv_trailing_metadata_ready_; - calld->original_recv_trailing_metadata_ready_ = nullptr; - Closure::Run(DEBUG_LOCATION, closure, error); -} - -void CallData::DecompressStartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - // Handle recv_initial_metadata. - if (batch->recv_initial_metadata) { - recv_initial_metadata_ = - batch->payload->recv_initial_metadata.recv_initial_metadata; - original_recv_initial_metadata_ready_ = - batch->payload->recv_initial_metadata.recv_initial_metadata_ready; - batch->payload->recv_initial_metadata.recv_initial_metadata_ready = - &on_recv_initial_metadata_ready_; - } - // Handle recv_message - if (batch->recv_message) { - recv_message_ = batch->payload->recv_message.recv_message; - recv_message_flags_ = batch->payload->recv_message.flags; - original_recv_message_ready_ = - batch->payload->recv_message.recv_message_ready; - batch->payload->recv_message.recv_message_ready = &on_recv_message_ready_; - } - // Handle recv_trailing_metadata - if (batch->recv_trailing_metadata) { - original_recv_trailing_metadata_ready_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &on_recv_trailing_metadata_ready_; - } - // Pass control down the stack. - grpc_call_next_op(elem, batch); -} - -void DecompressStartTransportStreamOpBatch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - CallData* calld = static_cast(elem->call_data); - calld->DecompressStartTransportStreamOpBatch(elem, batch); -} - -grpc_error_handle DecompressInitCallElem(grpc_call_element* elem, - const grpc_call_element_args* args) { - ChannelData* chand = static_cast(elem->channel_data); - new (elem->call_data) CallData(*args, chand); - return absl::OkStatus(); -} - -void DecompressDestroyCallElem(grpc_call_element* elem, - const grpc_call_final_info* /*final_info*/, - grpc_closure* /*ignored*/) { - CallData* calld = static_cast(elem->call_data); - calld->~CallData(); -} - -grpc_error_handle DecompressInitChannelElem(grpc_channel_element* elem, - grpc_channel_element_args* args) { - ChannelData* chand = static_cast(elem->channel_data); - new (chand) ChannelData(args); - return absl::OkStatus(); -} - -void DecompressDestroyChannelElem(grpc_channel_element* elem) { - ChannelData* chand = static_cast(elem->channel_data); - chand->~ChannelData(); -} - -} // namespace - -const grpc_channel_filter MessageDecompressFilter = { - DecompressStartTransportStreamOpBatch, - nullptr, - grpc_channel_next_op, - sizeof(CallData), - DecompressInitCallElem, - grpc_call_stack_ignore_set_pollset_or_pollset_set, - DecompressDestroyCallElem, - sizeof(ChannelData), - DecompressInitChannelElem, - grpc_channel_stack_no_post_init, - DecompressDestroyChannelElem, - grpc_channel_next_get_info, - "message_decompress"}; -} // namespace grpc_core diff --git a/src/core/ext/filters/http/message_compress/message_decompress_filter.h b/src/core/ext/filters/http/message_compress/message_decompress_filter.h deleted file mode 100644 index 52addca71f1..00000000000 --- a/src/core/ext/filters/http/message_compress/message_decompress_filter.h +++ /dev/null @@ -1,32 +0,0 @@ -// -// -// Copyright 2020 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// - -#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H -#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H - -#include - -#include "src/core/lib/channel/channel_fwd.h" -#include "src/core/lib/channel/channel_stack.h" - -namespace grpc_core { -extern const grpc_channel_filter MessageDecompressFilter; -} // namespace grpc_core - -#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \ - */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index ad6fb039df7..670dc05f101 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1970,7 +1970,8 @@ void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s, // what we want - which is safe because we haven't told anyone // about the metadata yet if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED || - s->recv_trailing_metadata_finished != nullptr) { + s->recv_trailing_metadata_finished != nullptr || + !s->final_metadata_requested) { s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status); if (!message.empty()) { s->trailing_metadata_buffer.Set( diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 02a1ecd8692..260f71f219d 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -418,6 +418,10 @@ class ClientStream : public Orphanable { message_to_send_->payload(); batch_payload_.send_message.flags = message_to_send_->flags(); } else { + if (grpc_call_trace.enabled()) { + gpr_log(GPR_INFO, "%sPollConnectedChannel: half close", + Activity::current()->DebugTag().c_str()); + } GPR_ASSERT(!absl::holds_alternative(send_message_state_)); client_trailing_metadata_ = GetContext()->MakePooled( @@ -473,6 +477,7 @@ class ClientStream : public Orphanable { if (server_initial_metadata_state_ == ServerInitialMetadataState::kSet && !absl::holds_alternative::PushType>( recv_message_state_) && + !absl::holds_alternative(recv_message_state_) && std::exchange(queued_trailing_metadata_, false)) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, @@ -579,6 +584,10 @@ class ClientStream : public Orphanable { recv_message_waker_.ActivityDebugTag().c_str()); } } else { + if (grpc_call_trace.enabled()) { + gpr_log(GPR_INFO, "%sRecvMessageBatchDone: received message", + recv_message_waker_.ActivityDebugTag().c_str()); + } auto pending = absl::get_if(&recv_message_state_); GPR_ASSERT(pending != nullptr); diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index 87aa718b4e7..f4c6192fdff 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -527,6 +527,12 @@ const char* BaseCallData::ReceiveMessage::StateString(State state) { return "CANCELLED_WHILST_FORWARDING"; case State::kBatchCompletedButCancelled: return "BATCH_COMPLETED_BUT_CANCELLED"; + case State::kCancelledWhilstIdle: + return "CANCELLED_WHILST_IDLE"; + case State::kCompletedWhilePulledFromPipe: + return "COMPLETED_WHILE_PULLED_FROM_PIPE"; + case State::kCompletedWhilePushedToPipe: + return "COMPLETED_WHILE_PUSHED_TO_PIPE"; } return "UNKNOWN"; } @@ -551,7 +557,10 @@ void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { case State::kBatchCompletedNoPipe: case State::kPushedToPipe: case State::kPulledFromPipe: + case State::kCompletedWhilePulledFromPipe: + case State::kCompletedWhilePushedToPipe: abort(); + case State::kCancelledWhilstIdle: case State::kCancelled: return; } @@ -586,7 +595,10 @@ void BaseCallData::ReceiveMessage::GotPipe(PipeSender* sender) { case State::kBatchCompleted: case State::kPushedToPipe: case State::kPulledFromPipe: + case State::kCompletedWhilePulledFromPipe: + case State::kCompletedWhilePushedToPipe: case State::kCancelledWhilstForwarding: + case State::kCancelledWhilstIdle: case State::kBatchCompletedButCancelled: abort(); case State::kCancelled: @@ -610,6 +622,9 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) { case State::kBatchCompletedNoPipe: case State::kCancelled: case State::kBatchCompletedButCancelled: + case State::kCancelledWhilstIdle: + case State::kCompletedWhilePulledFromPipe: + case State::kCompletedWhilePushedToPipe: abort(); case State::kForwardedBatchNoPipe: state_ = State::kBatchCompletedNoPipe; @@ -636,28 +651,41 @@ void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata, } switch (state_) { case State::kInitial: - case State::kIdle: state_ = State::kCancelled; break; + case State::kIdle: + state_ = State::kCancelledWhilstIdle; + break; case State::kForwardedBatch: case State::kForwardedBatchNoPipe: state_ = State::kCancelledWhilstForwarding; break; + case State::kCompletedWhilePulledFromPipe: + case State::kCompletedWhilePushedToPipe: case State::kPulledFromPipe: case State::kPushedToPipe: { auto status_code = - metadata.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_OK); - GPR_ASSERT(status_code != GRPC_STATUS_OK); - push_.reset(); - next_.reset(); - flusher->AddClosure(intercepted_on_complete_, - StatusFromMetadata(metadata), "recv_message_done"); - state_ = State::kCancelled; + metadata.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); + if (status_code == GRPC_STATUS_OK) { + if (state_ == State::kCompletedWhilePulledFromPipe || + state_ == State::kPulledFromPipe) { + state_ = State::kCompletedWhilePulledFromPipe; + } else { + state_ = State::kCompletedWhilePushedToPipe; + } + } else { + push_.reset(); + next_.reset(); + flusher->AddClosure(intercepted_on_complete_, + StatusFromMetadata(metadata), "recv_message_done"); + state_ = State::kCancelled; + } } break; case State::kBatchCompleted: case State::kBatchCompletedNoPipe: case State::kBatchCompletedButCancelled: abort(); + case State::kCancelledWhilstIdle: case State::kCancelledWhilstForwarding: case State::kCancelled: break; @@ -666,8 +694,10 @@ void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata, void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) { if (grpc_trace_channel.enabled()) { - gpr_log(GPR_DEBUG, "%s ReceiveMessage.WakeInsideCombiner st=%s", - base_->LogTag().c_str(), StateString(state_)); + gpr_log(GPR_DEBUG, + "%s ReceiveMessage.WakeInsideCombiner st=%s push?=%s next?=%s", + base_->LogTag().c_str(), StateString(state_), + push_.has_value() ? "yes" : "no", next_.has_value() ? "yes" : "no"); } switch (state_) { case State::kInitial: @@ -678,6 +708,10 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) { case State::kCancelledWhilstForwarding: case State::kBatchCompletedNoPipe: break; + case State::kCancelledWhilstIdle: + sender_->Close(); + state_ = State::kCancelled; + break; case State::kBatchCompletedButCancelled: sender_->Close(); state_ = State::kCancelled; @@ -701,10 +735,16 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) { } GPR_ASSERT(state_ == State::kPushedToPipe); ABSL_FALLTHROUGH_INTENDED; + case State::kCompletedWhilePushedToPipe: case State::kPushedToPipe: { GPR_ASSERT(push_.has_value()); auto r_push = (*push_)(); if (auto* p = absl::get_if(&r_push)) { + if (grpc_trace_channel.enabled()) { + gpr_log(GPR_DEBUG, + "%s ReceiveMessage.WakeInsideCombiner push complete: %s", + base_->LogTag().c_str(), *p ? "true" : "false"); + } // We haven't pulled through yet, so this certainly shouldn't succeed. GPR_ASSERT(!*p); state_ = State::kCancelled; @@ -713,11 +753,21 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) { GPR_ASSERT(next_.has_value()); auto r_next = (*next_)(); if (auto* p = absl::get_if>(&r_next)) { + if (grpc_trace_channel.enabled()) { + gpr_log(GPR_DEBUG, + "%s ReceiveMessage.WakeInsideCombiner next complete: %s", + base_->LogTag().c_str(), + p->has_value() ? "got message" : "end of stream"); + } next_.reset(); if (p->has_value()) { *intercepted_slice_buffer_ = std::move(*(**p)->payload()); *intercepted_flags_ = (**p)->flags(); - state_ = State::kPulledFromPipe; + if (state_ == State::kCompletedWhilePushedToPipe) { + state_ = State::kCompletedWhilePulledFromPipe; + } else { + state_ = State::kPulledFromPipe; + } } else { *intercepted_slice_buffer_ = absl::nullopt; *intercepted_flags_ = 0; @@ -725,12 +775,26 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher) { } } } - if (state_ != State::kPulledFromPipe) break; + if (state_ != State::kPulledFromPipe && + state_ != State::kCompletedWhilePulledFromPipe) { + break; + } ABSL_FALLTHROUGH_INTENDED; + case State::kCompletedWhilePulledFromPipe: case State::kPulledFromPipe: { GPR_ASSERT(push_.has_value()); if (!absl::holds_alternative((*push_)())) { - state_ = State::kIdle; + if (grpc_trace_channel.enabled()) { + gpr_log(GPR_DEBUG, + "%s ReceiveMessage.WakeInsideCombiner push complete", + base_->LogTag().c_str()); + } + if (state_ == State::kCompletedWhilePulledFromPipe) { + sender_->Close(); + state_ = State::kCancelled; + } else { + state_ = State::kIdle; + } push_.reset(); flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), absl::OkStatus(), "recv_message"); @@ -1512,8 +1576,10 @@ void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) { Flusher flusher(this); if (grpc_trace_channel.enabled()) { gpr_log(GPR_DEBUG, - "%s ClientCallData.RecvTrailingMetadataReady error=%s md=%s", - LogTag().c_str(), error.ToString().c_str(), + "%s ClientCallData.RecvTrailingMetadataReady " + "recv_trailing_state=%s error=%s md=%s", + LogTag().c_str(), StateString(recv_trailing_state_), + error.ToString().c_str(), recv_trailing_metadata_->DebugString().c_str()); } // If we were cancelled prior to receiving this callback, we should simply @@ -1536,6 +1602,9 @@ void ClientCallData::RecvTrailingMetadataReady(grpc_error_handle error) { // Record that we've got the callback. GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kForwarded); recv_trailing_state_ = RecvTrailingState::kComplete; + if (receive_message() != nullptr) { + receive_message()->Done(*recv_trailing_metadata_, &flusher); + } // Repoll the promise. ScopedContext context(this); WakeInsideCombiner(&flusher); diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 84458b5271b..75da4fe620e 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -393,6 +393,9 @@ class BaseCallData : public Activity, private Wakeable { kPulledFromPipe, // We're done. kCancelled, + // Call got terminated whilst we were idle: we need to close the sender + // pipe next poll. + kCancelledWhilstIdle, // Call got terminated whilst we had forwarded a recv_message down the // stack: we need to keep track of that until we get the completion so // that we do the right thing in OnComplete. @@ -402,6 +405,9 @@ class BaseCallData : public Activity, private Wakeable { // On the next poll we'll close things out and forward on completions, // then transition to cancelled. kBatchCompletedButCancelled, + // Completed successfully while we're processing a recv message. + kCompletedWhilePushedToPipe, + kCompletedWhilePulledFromPipe, }; static const char* StateString(State); diff --git a/src/core/lib/compression/compression_internal.cc b/src/core/lib/compression/compression_internal.cc index c90817c4a7e..7fae483c6e8 100644 --- a/src/core/lib/compression/compression_internal.cc +++ b/src/core/lib/compression/compression_internal.cc @@ -21,11 +21,13 @@ #include "src/core/lib/compression/compression_internal.h" #include -#include + +#include #include "absl/container/inlined_vector.h" #include "absl/strings/ascii.h" #include "absl/strings/str_split.h" +#include "absl/types/variant.h" #include @@ -163,19 +165,13 @@ CompressionAlgorithmSet CompressionAlgorithmSet::FromUint32(uint32_t value) { } CompressionAlgorithmSet CompressionAlgorithmSet::FromChannelArgs( - const grpc_channel_args* args) { + const ChannelArgs& args) { CompressionAlgorithmSet set; static const uint32_t kEverything = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; - if (args != nullptr) { - set = CompressionAlgorithmSet::FromUint32(grpc_channel_args_find_integer( - args, GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, - grpc_integer_options{kEverything, 0, kEverything})); - set.Set(GRPC_COMPRESS_NONE); - } else { - set = CompressionAlgorithmSet::FromUint32(kEverything); - } - return set; + return CompressionAlgorithmSet::FromUint32( + args.GetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET) + .value_or(kEverything)); } CompressionAlgorithmSet::CompressionAlgorithmSet() = default; @@ -230,18 +226,14 @@ uint32_t CompressionAlgorithmSet::ToLegacyBitmask() const { } absl::optional -DefaultCompressionAlgorithmFromChannelArgs(const grpc_channel_args* args) { - if (args == nullptr) return absl::nullopt; - for (size_t i = 0; i < args->num_args; i++) { - if (strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM) == - 0) { - if (args->args[i].type == GRPC_ARG_INTEGER) { - return static_cast( - args->args[i].value.integer); - } else if (args->args[i].type == GRPC_ARG_STRING) { - return ParseCompressionAlgorithm(args->args[i].value.string); - } - } +DefaultCompressionAlgorithmFromChannelArgs(const ChannelArgs& args) { + auto* value = args.Get(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM); + if (value == nullptr) return absl::nullopt; + if (auto* p = absl::get_if(value)) { + return static_cast(*p); + } + if (auto* p = absl::get_if(value)) { + return ParseCompressionAlgorithm(*p); } return absl::nullopt; } diff --git a/src/core/lib/compression/compression_internal.h b/src/core/lib/compression/compression_internal.h index 3973952aaad..f44ae9ebb36 100644 --- a/src/core/lib/compression/compression_internal.h +++ b/src/core/lib/compression/compression_internal.h @@ -29,8 +29,8 @@ #include "absl/types/optional.h" #include -#include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/slice/slice.h" @@ -46,7 +46,7 @@ const char* CompressionAlgorithmAsString(grpc_compression_algorithm algorithm); // Retrieve the default compression algorithm from channel args, return nullopt // if not found. absl::optional -DefaultCompressionAlgorithmFromChannelArgs(const grpc_channel_args* args); +DefaultCompressionAlgorithmFromChannelArgs(const ChannelArgs& args); // A set of grpc_compression_algorithm values. class CompressionAlgorithmSet { @@ -55,7 +55,7 @@ class CompressionAlgorithmSet { // algorithm 1, etc. static CompressionAlgorithmSet FromUint32(uint32_t value); // Locate in channel args and construct from the found value. - static CompressionAlgorithmSet FromChannelArgs(const grpc_channel_args* args); + static CompressionAlgorithmSet FromChannelArgs(const ChannelArgs& args); // Parse a string of comma-separated compression algorithms. static CompressionAlgorithmSet FromString(absl::string_view str); // Construct an empty set. diff --git a/src/core/lib/gprpp/bitset.h b/src/core/lib/gprpp/bitset.h index 371668c93b4..77ed4552415 100644 --- a/src/core/lib/gprpp/bitset.h +++ b/src/core/lib/gprpp/bitset.h @@ -171,6 +171,18 @@ class BitSet { return result; } + BitSet& Set(int i, bool value) { + set(i, value); + return *this; + } + + BitSet& SetAll(bool value) { + for (size_t i = 0; i < kTotalBits; i++) { + set(i, value); + } + return *this; + } + private: // Given a bit index, return which unit it's stored in. static constexpr size_t unit_for(size_t bit) { return bit / kUnitBits; } diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 141b16e345a..5523ed387ad 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1358,11 +1358,10 @@ void Server::CallData::RecvInitialMetadataReady(void* arg, } if (calld->host_.has_value() && calld->path_.has_value()) { /* do nothing */ - } else { + } else if (error.ok()) { /* Pass the error reference to calld->recv_initial_metadata_error */ grpc_error_handle src_error = error; - error = GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", - &src_error, 1); + error = absl::UnknownError("Missing :authority or :path"); calld->recv_initial_metadata_error_ = error; } grpc_closure* closure = calld->original_recv_initial_metadata_ready_; diff --git a/src/core/lib/transport/metadata_batch.cc b/src/core/lib/transport/metadata_batch.cc index b0cfca78186..83753246d42 100644 --- a/src/core/lib/transport/metadata_batch.cc +++ b/src/core/lib/transport/metadata_batch.cc @@ -204,7 +204,10 @@ StaticSlice HttpMethodMetadata::Encode(ValueType x) { case kGet: return StaticSlice::FromStaticString("GET"); default: - abort(); + // TODO(ctiller): this should be an abort, we should split up the debug + // string generation from the encode string generation so that debug + // strings can always succeed and encode strings can crash. + return StaticSlice::FromStaticString("<>"); } } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 20438198590..469013ec22d 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -80,8 +80,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/http/client/http_client_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', - 'src/core/ext/filters/http/message_compress/message_compress_filter.cc', - 'src/core/ext/filters/http/message_compress/message_decompress_filter.cc', + 'src/core/ext/filters/http/message_compress/compression_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/rbac/rbac_filter.cc', diff --git a/test/core/channel/BUILD b/test/core/channel/BUILD index 592d8ee44b3..516976b2103 100644 --- a/test/core/channel/BUILD +++ b/test/core/channel/BUILD @@ -64,6 +64,9 @@ grpc_cc_test( grpc_cc_test( name = "minimal_stack_is_minimal_test", srcs = ["minimal_stack_is_minimal_test.cc"], + external_deps = [ + "gtest", + ], language = "C++", uses_event_engine = False, uses_polling = False, diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index 8bef0283ef4..5c043e52dc0 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -29,14 +29,13 @@ * configurations and assess whether such a change is correct and desirable. */ -#include #include #include #include #include -#include "absl/strings/str_join.h" +#include "gtest/gtest.h" #include #include @@ -52,88 +51,12 @@ #include "src/core/lib/transport/transport_impl.h" #include "test/core/util/test_config.h" -// use CHECK_STACK instead -static int check_stack(const char* file, int line, const char* transport_name, - grpc_channel_args* init_args, - unsigned channel_stack_type, ...); - -// arguments: const char *transport_name - the name of the transport type to -// simulate -// grpc_channel_args *init_args - channel args to pass down -// grpc_channel_stack_type channel_stack_type - the archetype of -// channel stack to create -// variadic arguments - the (in-order) expected list of channel -// filters to instantiate, terminated with NULL -#define CHECK_STACK(...) check_stack(__FILE__, __LINE__, __VA_ARGS__) - -int main(int argc, char** argv) { - grpc::testing::TestEnvironment env(&argc, argv); - grpc_init(); - int errors = 0; - - // tests with a minimal stack - grpc_arg minimal_stack_arg; - minimal_stack_arg.type = GRPC_ARG_INTEGER; - minimal_stack_arg.key = const_cast(GRPC_ARG_MINIMAL_STACK); - minimal_stack_arg.value.integer = 1; - grpc_channel_args minimal_stack_args = {1, &minimal_stack_arg}; - errors += - CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL, - "authority", "connected", NULL); - errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL, - "authority", "connected", NULL); - errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_SERVER_CHANNEL, - "server", "connected", NULL); - errors += - CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL, - "authority", "http-client", "connected", NULL); - errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL, - "authority", "http-client", "connected", NULL); - errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_SERVER_CHANNEL, - "server", "http-server", "connected", NULL); - errors += CHECK_STACK(nullptr, &minimal_stack_args, GRPC_CLIENT_CHANNEL, - "client-channel", NULL); - - // tests with a default stack - errors += - CHECK_STACK("unknown", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority", - "message_size", "deadline", "connected", NULL); - errors += CHECK_STACK("unknown", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority", - "message_size", "connected", NULL); - errors += CHECK_STACK("unknown", nullptr, GRPC_SERVER_CHANNEL, "server", - "message_size", "deadline", "connected", NULL); - errors += - CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority", - "message_size", "deadline", "http-client", - "message_decompress", "message_compress", "connected", NULL); - errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority", - "message_size", "http-client", "message_decompress", - "message_compress", "connected", NULL); - errors += - CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server", - "message_size", "deadline", "http-server", - "message_decompress", "message_compress", "connected", NULL); - errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel", - NULL); - - GPR_ASSERT(errors == 0); - grpc_shutdown(); - return 0; -} - -/******************************************************************************* - * End of tests definitions, start of test infrastructure - */ - -static int check_stack(const char* file, int line, const char* transport_name, - grpc_channel_args* init_args, - unsigned channel_stack_type, ...) { - grpc_core::ChannelArgs channel_args = - grpc_core::ChannelArgs::FromC(init_args); +std::vector MakeStack(const char* transport_name, + const grpc_core::ChannelArgs& channel_args, + grpc_channel_stack_type channel_stack_type) { // create phony channel stack - grpc_core::ChannelStackBuilderImpl builder( - "test", static_cast(channel_stack_type), - channel_args); + grpc_core::ChannelStackBuilderImpl builder("test", channel_stack_type, + channel_args); grpc_transport_vtable fake_transport_vtable; memset(&fake_transport_vtable, 0, sizeof(grpc_transport_vtable)); fake_transport_vtable.name = transport_name; @@ -148,44 +71,75 @@ static int check_stack(const char* file, int line, const char* transport_name, &builder)); } - // build up our expectation list std::vector parts; - va_list args; - va_start(args, channel_stack_type); - for (;;) { - char* a = va_arg(args, char*); - if (a == nullptr) break; - parts.push_back(a); - } - va_end(args); - std::string expect = absl::StrJoin(parts, ", "); - - // build up our "got" list - parts.clear(); for (const auto& entry : *builder.mutable_stack()) { const char* name = entry->name; if (name == nullptr) continue; parts.push_back(name); } - std::string got = absl::StrJoin(parts, ", "); - - // figure out result, log if there's an error - int result = 0; - if (got != expect) { - std::string args_str = channel_args.ToString(); - - gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, - "**************************************************"); - gpr_log( - file, line, GPR_LOG_SEVERITY_ERROR, - "FAILED transport=%s; stack_type=%s; channel_args=%s:", transport_name, - grpc_channel_stack_type_string( - static_cast(channel_stack_type)), - args_str.c_str()); - gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "EXPECTED: %s", expect.c_str()); - gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "GOT: %s", got.c_str()); - result = 1; - } - return result; + return parts; +} + +TEST(ChannelStackFilters, LooksAsExpected) { + const auto minimal_stack_args = + grpc_core::ChannelArgs().Set(GRPC_ARG_MINIMAL_STACK, true); + const auto no_args = grpc_core::ChannelArgs(); + + EXPECT_EQ( + MakeStack("unknown", minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL), + std::vector({"authority", "connected"})); + EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_CLIENT_SUBCHANNEL), + std::vector({"authority", "connected"})); + EXPECT_EQ(MakeStack("unknown", minimal_stack_args, GRPC_SERVER_CHANNEL), + std::vector({"server", "connected"})); + + EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL), + std::vector( + {"authority", "http-client", "compression", "connected"})); + EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_CLIENT_SUBCHANNEL), + std::vector( + {"authority", "http-client", "compression", "connected"})); + EXPECT_EQ(MakeStack("chttp2", minimal_stack_args, GRPC_SERVER_CHANNEL), + std::vector( + {"server", "http-server", "compression", "connected"})); + EXPECT_EQ(MakeStack(nullptr, minimal_stack_args, GRPC_CLIENT_CHANNEL), + std::vector({"client-channel"})); + + // tests with a default stack + + EXPECT_EQ(MakeStack("unknown", no_args, GRPC_CLIENT_DIRECT_CHANNEL), + std::vector( + {"authority", "message_size", "deadline", "connected"})); + EXPECT_EQ( + MakeStack("unknown", no_args, GRPC_CLIENT_SUBCHANNEL), + std::vector({"authority", "message_size", "connected"})); + EXPECT_EQ(MakeStack("unknown", no_args, GRPC_SERVER_CHANNEL), + std::vector( + {"server", "message_size", "deadline", "connected"})); + + EXPECT_EQ( + MakeStack("chttp2", no_args, GRPC_CLIENT_DIRECT_CHANNEL), + std::vector({"authority", "message_size", "deadline", + "http-client", "compression", "connected"})); + EXPECT_EQ( + MakeStack("chttp2", no_args, GRPC_CLIENT_SUBCHANNEL), + std::vector({"authority", "message_size", "http-client", + "compression", "connected"})); + + EXPECT_EQ( + MakeStack("chttp2", no_args, GRPC_SERVER_CHANNEL), + std::vector({"server", "message_size", "deadline", + "http-server", "compression", "connected"})); + EXPECT_EQ(MakeStack(nullptr, no_args, GRPC_CLIENT_CHANNEL), + std::vector({"client-channel"})); +} + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + ::testing::InitGoogleTest(&argc, argv); + grpc_init(); + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; } diff --git a/test/core/compression/BUILD b/test/core/compression/BUILD index f10b6ccb3b4..1cc6b696e24 100644 --- a/test/core/compression/BUILD +++ b/test/core/compression/BUILD @@ -14,7 +14,6 @@ load( "//bazel:grpc_build_system.bzl", - "grpc_cc_library", "grpc_cc_test", "grpc_package", ) @@ -24,15 +23,6 @@ grpc_package(name = "test/core/compression") licenses(["notice"]) -grpc_cc_library( - name = "args_utils", - testonly = 1, - srcs = ["args_utils.cc"], - hdrs = ["args_utils.h"], - visibility = ["//visibility:public"], - deps = ["//:grpc"], -) - grpc_cc_test( name = "compression_test", srcs = ["compression_test.cc"], @@ -41,7 +31,6 @@ grpc_cc_test( uses_event_engine = False, uses_polling = False, deps = [ - ":args_utils", "//:gpr", "//:grpc", "//test/core/util:grpc_test_util", diff --git a/test/core/compression/args_utils.cc b/test/core/compression/args_utils.cc deleted file mode 100644 index de965aeba31..00000000000 --- a/test/core/compression/args_utils.cc +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2021 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "test/core/compression/args_utils.h" - -#include - -#include "absl/types/optional.h" - -#include -#include - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/compression/compression_internal.h" -#include "src/core/lib/gpr/useful.h" - -const grpc_channel_args* -grpc_channel_args_set_channel_default_compression_algorithm( - const grpc_channel_args* a, grpc_compression_algorithm algorithm) { - GPR_ASSERT(algorithm < GRPC_COMPRESS_ALGORITHMS_COUNT); - grpc_arg tmp; - tmp.type = GRPC_ARG_INTEGER; - tmp.key = const_cast(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM); - tmp.value.integer = algorithm; - return grpc_channel_args_copy_and_add(a, &tmp, 1); -} - -/** Returns 1 if the argument for compression algorithm's enabled states bitset - * was found in \a a, returning the arg's value in \a states. Otherwise, returns - * 0. */ -static int find_compression_algorithm_states_bitset(const grpc_channel_args* a, - int** states_arg) { - if (a != nullptr) { - size_t i; - for (i = 0; i < a->num_args; ++i) { - if (a->args[i].type == GRPC_ARG_INTEGER && - !strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, - a->args[i].key)) { - *states_arg = &a->args[i].value.integer; - **states_arg = - (**states_arg & ((1 << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1)) | - 0x1; /* forcefully enable support for no compression */ - return 1; - } - } - } - return 0; /* GPR_FALSE */ -} - -const grpc_channel_args* grpc_channel_args_compression_algorithm_set_state( - const grpc_channel_args** a, grpc_compression_algorithm algorithm, - int state) { - int* states_arg = nullptr; - const grpc_channel_args* result = *a; - const int states_arg_found = - find_compression_algorithm_states_bitset(*a, &states_arg); - - if (grpc_core::DefaultCompressionAlgorithmFromChannelArgs(*a) == algorithm && - state == 0) { - const char* algo_name = nullptr; - GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name) != 0); - gpr_log(GPR_ERROR, - "Tried to disable default compression algorithm '%s'. The " - "operation has been ignored.", - algo_name); - } else if (states_arg_found) { - if (state != 0) { - grpc_core::SetBit(reinterpret_cast(states_arg), algorithm); - } else if (algorithm != GRPC_COMPRESS_NONE) { - grpc_core::ClearBit(reinterpret_cast(states_arg), algorithm); - } - } else { - /* create a new arg */ - grpc_arg tmp; - tmp.type = GRPC_ARG_INTEGER; - tmp.key = - const_cast(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET); - /* all enabled by default */ - tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; - if (state != 0) { - grpc_core::SetBit(reinterpret_cast(&tmp.value.integer), - algorithm); - } else if (algorithm != GRPC_COMPRESS_NONE) { - grpc_core::ClearBit(reinterpret_cast(&tmp.value.integer), - algorithm); - } - result = grpc_channel_args_copy_and_add(*a, &tmp, 1); - grpc_channel_args_destroy(*a); - *a = result; - } - return result; -} diff --git a/test/core/compression/args_utils.h b/test/core/compression/args_utils.h deleted file mode 100644 index d6b73b13b35..00000000000 --- a/test/core/compression/args_utils.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2021 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_TEST_CORE_COMPRESSION_ARGS_UTILS_H_H -#define GRPC_TEST_CORE_COMPRESSION_ARGS_UTILS_H_H - -#include -#include - -// TODO(ctiller): when we do the channel args migration, just delete this. -const grpc_channel_args* -grpc_channel_args_set_channel_default_compression_algorithm( - const grpc_channel_args* a, grpc_compression_algorithm algorithm); - -const grpc_channel_args* grpc_channel_args_compression_algorithm_set_state( - const grpc_channel_args** a, grpc_compression_algorithm algorithm, - int state); - -const grpc_channel_args* -grpc_channel_args_set_channel_default_compression_algorithm( - const grpc_channel_args* a, grpc_compression_algorithm algorithm); - -#endif diff --git a/test/core/compression/compression_test.cc b/test/core/compression/compression_test.cc index 2dde22874f5..396e2ce9370 100644 --- a/test/core/compression/compression_test.cc +++ b/test/core/compression/compression_test.cc @@ -22,15 +22,10 @@ #include "gtest/gtest.h" #include -#include #include #include -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/compression/compression_internal.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "test/core/compression/args_utils.h" #include "test/core/util/test_config.h" TEST(CompressionTest, CompressionAlgorithmParse) { @@ -245,70 +240,6 @@ TEST(CompressionTest, CompressionEnableDisableAlgorithm) { } } -TEST(CompressionTest, ChannelArgsSetCompressionAlgorithm) { - grpc_core::ExecCtx exec_ctx; - const grpc_channel_args* ch_args; - - ch_args = grpc_channel_args_set_channel_default_compression_algorithm( - nullptr, GRPC_COMPRESS_GZIP); - ASSERT_EQ(ch_args->num_args, 1); - ASSERT_STREQ(ch_args->args[0].key, - GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM); - ASSERT_EQ(ch_args->args[0].type, GRPC_ARG_INTEGER); - - grpc_channel_args_destroy(ch_args); -} - -TEST(CompressionTest, ChannelArgsCompressionAlgorithmStates) { - grpc_core::ExecCtx exec_ctx; - grpc_core::CompressionAlgorithmSet states; - - const grpc_channel_args* ch_args = - grpc_channel_args_copy_and_add(nullptr, nullptr, 0); - /* by default, all enabled */ - states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(ch_args); - - for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) { - ASSERT_TRUE(states.IsSet(static_cast(i))); - } - - /* disable gzip and deflate and stream/gzip */ - const grpc_channel_args* ch_args_wo_gzip = - grpc_channel_args_compression_algorithm_set_state(&ch_args, - GRPC_COMPRESS_GZIP, 0); - ASSERT_EQ(ch_args, ch_args_wo_gzip); - const grpc_channel_args* ch_args_wo_gzip_deflate = - grpc_channel_args_compression_algorithm_set_state( - &ch_args_wo_gzip, GRPC_COMPRESS_DEFLATE, 0); - ASSERT_EQ(ch_args_wo_gzip, ch_args_wo_gzip_deflate); - - states = grpc_core::CompressionAlgorithmSet::FromChannelArgs( - ch_args_wo_gzip_deflate); - for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) { - if (i == GRPC_COMPRESS_GZIP || i == GRPC_COMPRESS_DEFLATE) { - ASSERT_FALSE(states.IsSet(static_cast(i))); - } else { - ASSERT_TRUE(states.IsSet(static_cast(i))); - } - } - - /* re-enabled gzip only */ - ch_args_wo_gzip = grpc_channel_args_compression_algorithm_set_state( - &ch_args_wo_gzip_deflate, GRPC_COMPRESS_GZIP, 1); - ASSERT_EQ(ch_args_wo_gzip, ch_args_wo_gzip_deflate); - - states = grpc_core::CompressionAlgorithmSet::FromChannelArgs(ch_args_wo_gzip); - for (size_t i = 0; i < GRPC_COMPRESS_ALGORITHMS_COUNT; i++) { - if (i == GRPC_COMPRESS_DEFLATE) { - ASSERT_FALSE(states.IsSet(static_cast(i))); - } else { - ASSERT_TRUE(states.IsSet(static_cast(i))); - } - } - - grpc_channel_args_destroy(ch_args); -} - int main(int argc, char** argv) { grpc::testing::TestEnvironment env(&argc, argv); ::testing::InitGoogleTest(&argc, argv); diff --git a/test/core/end2end/fixtures/h2_compress.cc b/test/core/end2end/fixtures/h2_compress.cc index 6f51ec2648c..023c6278afd 100644 --- a/test/core/end2end/fixtures/h2_compress.cc +++ b/test/core/end2end/fixtures/h2_compress.cc @@ -28,19 +28,12 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "test/core/compression/args_utils.h" #include "test/core/end2end/end2end_tests.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" struct fullstack_compression_fixture_data { - ~fullstack_compression_fixture_data() { - grpc_channel_args_destroy(client_args_compression); - grpc_channel_args_destroy(server_args_compression); - } std::string localaddr; - const grpc_channel_args* client_args_compression = nullptr; - const grpc_channel_args* server_args_compression = nullptr; }; static grpc_end2end_test_fixture chttp2_create_fixture_fullstack_compression( @@ -63,16 +56,14 @@ void chttp2_init_client_fullstack_compression( grpc_end2end_test_fixture* f, const grpc_channel_args* client_args) { fullstack_compression_fixture_data* ffd = static_cast(f->fixture_data); - if (ffd->client_args_compression != nullptr) { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(ffd->client_args_compression); - } - ffd->client_args_compression = - grpc_channel_args_set_channel_default_compression_algorithm( - client_args, GRPC_COMPRESS_GZIP); grpc_channel_credentials* creds = grpc_insecure_credentials_create(); - f->client = grpc_channel_create(ffd->localaddr.c_str(), creds, - ffd->client_args_compression); + f->client = grpc_channel_create( + ffd->localaddr.c_str(), creds, + grpc_core::ChannelArgs::FromC(client_args) + .SetIfUnset(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, + GRPC_COMPRESS_GZIP) + .ToC() + .get()); grpc_channel_credentials_release(creds); } @@ -80,17 +71,16 @@ void chttp2_init_server_fullstack_compression( grpc_end2end_test_fixture* f, const grpc_channel_args* server_args) { fullstack_compression_fixture_data* ffd = static_cast(f->fixture_data); - if (ffd->server_args_compression != nullptr) { - grpc_core::ExecCtx exec_ctx; - grpc_channel_args_destroy(ffd->server_args_compression); - } - ffd->server_args_compression = - grpc_channel_args_set_channel_default_compression_algorithm( - server_args, GRPC_COMPRESS_GZIP); if (f->server) { grpc_server_destroy(f->server); } - f->server = grpc_server_create(ffd->server_args_compression, nullptr); + f->server = grpc_server_create( + grpc_core::ChannelArgs::FromC(server_args) + .SetIfUnset(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, + GRPC_COMPRESS_GZIP) + .ToC() + .get(), + nullptr); grpc_server_register_completion_queue(f->server, f->cq, nullptr); grpc_server_credentials* server_creds = grpc_insecure_server_credentials_create(); diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index 62421332c16..c746efb0fe1 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -17,8 +17,11 @@ */ #include +#include #include +#include + #include "absl/status/statusor.h" #include @@ -39,6 +42,7 @@ #include "src/core/lib/resource_quota/api.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_stack_type.h" +#include "src/core/lib/surface/event_string.h" #include "src/core/lib/transport/transport_fwd.h" #include "test/core/util/mock_endpoint.h" @@ -151,13 +155,23 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { for (int i = 0; i < requested_calls; i++) { ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); - GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + if (ev.type != GRPC_OP_COMPLETE) { + gpr_log(GPR_ERROR, + "[%d/%d requested calls] Unexpected event type (expected " + "COMPLETE): %s", + i, requested_calls, grpc_event_string(&ev).c_str()); + abort(); + } } grpc_completion_queue_shutdown(cq); for (int i = 0; i < requested_calls; i++) { ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); - GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); + if (ev.type != GRPC_QUEUE_SHUTDOWN) { + gpr_log(GPR_ERROR, "Unexpected event type (expected SHUTDOWN): %s", + grpc_event_string(&ev).c_str()); + abort(); + } } grpc_call_unref(call); grpc_completion_queue_destroy(cq); diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index 03dcf6dcae5..a901ae39fdf 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -450,7 +450,6 @@ def grpc_end2end_tests(): ":local_util", "//test/core/util:test_lb_policies", "//:grpc_authorization_provider", - "//test/core/compression:args_utils", "//:grpc_http_filters", "//src/core:event_log", ], @@ -475,7 +474,6 @@ def grpc_end2end_tests(): "//test/core/util:grpc_test_util", "//:grpc", "//:gpr", - "//test/core/compression:args_utils", "//:grpc_http_filters", ], tags = _platform_support_tags(fopt) + fopt.tags, diff --git a/test/core/end2end/tests/compressed_payload.cc b/test/core/end2end/tests/compressed_payload.cc index 3b251918d78..6003e5d8a54 100644 --- a/test/core/end2end/tests/compressed_payload.cc +++ b/test/core/end2end/tests/compressed_payload.cc @@ -33,9 +33,9 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call_test_only.h" -#include "test/core/compression/args_utils.h" #include "test/core/end2end/cq_verifier.h" #include "test/core/end2end/end2end_tests.h" #include "test/core/util/test_config.h" @@ -109,8 +109,8 @@ static void request_for_disabled_algorithm( grpc_call* s; grpc_slice request_payload_slice; grpc_byte_buffer* request_payload; - const grpc_channel_args* client_args; - const grpc_channel_args* server_args; + grpc_core::ChannelArgs client_args; + grpc_core::ChannelArgs server_args; grpc_end2end_test_fixture f; grpc_op ops[6]; grpc_op* op; @@ -130,28 +130,26 @@ static void request_for_disabled_algorithm( request_payload_slice = grpc_slice_from_copied_string(str); request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - client_args = grpc_channel_args_set_channel_default_compression_algorithm( - nullptr, requested_client_compression_algorithm); - server_args = grpc_channel_args_set_channel_default_compression_algorithm( - nullptr, GRPC_COMPRESS_NONE); - server_args = grpc_channel_args_compression_algorithm_set_state( - &server_args, algorithm_to_disable, false); + client_args = + grpc_core::ChannelArgs().Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, + requested_client_compression_algorithm); + server_args = + grpc_core::ChannelArgs() + .Set(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, GRPC_COMPRESS_NONE) + .Set(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, + grpc_core::BitSet() + .SetAll(true) + .Set(algorithm_to_disable, false) + .ToInt()); if (!decompress_in_core) { - grpc_arg disable_decompression_in_core_arg = - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0); - const grpc_channel_args* old_client_args = client_args; - const grpc_channel_args* old_server_args = server_args; - client_args = grpc_channel_args_copy_and_add( - client_args, &disable_decompression_in_core_arg, 1); - server_args = grpc_channel_args_copy_and_add( - server_args, &disable_decompression_in_core_arg, 1); - grpc_channel_args_destroy(old_client_args); - grpc_channel_args_destroy(old_server_args); + client_args = + client_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false); + server_args = + server_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false); } - f = begin_test(config, test_name, client_args, server_args, - decompress_in_core); + f = begin_test(config, test_name, client_args.ToC().get(), + server_args.ToC().get(), decompress_in_core); grpc_core::CqVerifier cqv(f.cq); gpr_timespec deadline = five_seconds_from_now(); @@ -266,8 +264,6 @@ static void request_for_disabled_algorithm( grpc_slice_unref(request_payload_slice); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(request_payload_recv); - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); end_test(&f); config.tear_down_data(&f); } @@ -286,8 +282,8 @@ static void request_with_payload_template_inner( grpc_call* s; grpc_slice request_payload_slice; grpc_byte_buffer* request_payload = nullptr; - const grpc_channel_args* client_args; - const grpc_channel_args* server_args; + grpc_core::ChannelArgs client_args; + grpc_core::ChannelArgs server_args; grpc_end2end_test_fixture f; grpc_op ops[6]; grpc_op* op; @@ -315,25 +311,20 @@ static void request_with_payload_template_inner( grpc_slice response_payload_slice = grpc_slice_from_copied_string(response_str); - client_args = grpc_channel_args_set_channel_default_compression_algorithm( - nullptr, default_client_channel_compression_algorithm); - server_args = grpc_channel_args_set_channel_default_compression_algorithm( - nullptr, default_server_channel_compression_algorithm); + client_args = grpc_core::ChannelArgs().Set( + GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, + default_client_channel_compression_algorithm); + server_args = grpc_core::ChannelArgs().Set( + GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, + default_server_channel_compression_algorithm); if (!decompress_in_core) { - grpc_arg disable_decompression_in_core_arg = - grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0); - const grpc_channel_args* old_client_args = client_args; - const grpc_channel_args* old_server_args = server_args; - client_args = grpc_channel_args_copy_and_add( - client_args, &disable_decompression_in_core_arg, 1); - server_args = grpc_channel_args_copy_and_add( - server_args, &disable_decompression_in_core_arg, 1); - grpc_channel_args_destroy(old_client_args); - grpc_channel_args_destroy(old_server_args); + client_args = + client_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false); + server_args = + server_args.Set(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION, false); } - f = begin_test(config, test_name, client_args, server_args, - decompress_in_core); + f = begin_test(config, test_name, client_args.ToC().get(), + server_args.ToC().get(), decompress_in_core); grpc_core::CqVerifier cqv(f.cq); gpr_timespec deadline = five_seconds_from_now(); @@ -558,8 +549,6 @@ static void request_with_payload_template_inner( grpc_call_unref(c); grpc_call_unref(s); - grpc_channel_args_destroy(client_args); - grpc_channel_args_destroy(server_args); end_test(&f); config.tear_down_data(&f); } diff --git a/test/core/end2end/tests/max_message_length.cc b/test/core/end2end/tests/max_message_length.cc index 8425f73faf5..2eaa7d0fa03 100644 --- a/test/core/end2end/tests/max_message_length.cc +++ b/test/core/end2end/tests/max_message_length.cc @@ -19,6 +19,8 @@ #include #include +#include + #include "absl/strings/match.h" #include @@ -43,7 +45,10 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_channel_args* client_args, grpc_channel_args* server_args) { grpc_end2end_test_fixture f; - gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + gpr_log(GPR_INFO, "\n\n\nRunning test: %s/%s client_args=%s server_args=%s", + test_name, config.name, + grpc_core::ChannelArgs::FromC(client_args).ToString().c_str(), + grpc_core::ChannelArgs::FromC(server_args).ToString().c_str()); // We intentionally do not pass the client and server args to // create_fixture(), since we don't want the limit enforced on the // proxy, only on the backend server. @@ -482,11 +487,8 @@ static grpc_metadata gzip_compression_override() { // Test receive message limit with compressed request larger than the limit static void test_max_receive_message_length_on_compressed_request( - grpc_end2end_test_config config, bool minimal_stack) { - gpr_log(GPR_INFO, - "test max receive message length on compressed request with " - "minimal_stack=%d", - minimal_stack); + grpc_end2end_test_config config) { + gpr_log(GPR_INFO, "test max receive message length on compressed request"); grpc_end2end_test_fixture f; grpc_call* c = nullptr; grpc_call* s = nullptr; @@ -503,17 +505,15 @@ static void test_max_receive_message_length_on_compressed_request( grpc_call_details call_details; grpc_status_code status; grpc_call_error error; - grpc_slice details, status_details; + grpc_slice details; int was_cancelled = 2; // Set limit via channel args. - grpc_arg arg[2]; + grpc_arg arg[1]; arg[0] = grpc_channel_arg_integer_create( const_cast(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5); - arg[1] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_MINIMAL_STACK), minimal_stack); grpc_channel_args* server_args = - grpc_channel_args_copy_and_add(nullptr, arg, 2); + grpc_channel_args_copy_and_add(nullptr, arg, 1); f = begin_test(config, "test_max_request_message_length", nullptr, server_args); @@ -592,41 +592,20 @@ static void test_max_receive_message_length_on_compressed_request( op->flags = 0; op->reserved = nullptr; op++; - if (minimal_stack) { - /* Expect the RPC to proceed normally for a minimal stack */ - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op->flags = 0; - op->reserved = nullptr; - op++; - op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; - op->data.send_status_from_server.trailing_metadata_count = 0; - op->data.send_status_from_server.status = GRPC_STATUS_OK; - status_details = grpc_slice_from_static_string("xyz"); - op->data.send_status_from_server.status_details = &status_details; - op->flags = 0; - op->reserved = nullptr; - op++; - } error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(103), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); - cqv.Expect(tag(102), minimal_stack); + cqv.Expect(tag(102), false); cqv.Expect(tag(103), true); cqv.Expect(tag(1), true); cqv.Verify(); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); - if (minimal_stack) { - /* We do not perform message size checks for minimal stack. */ - GPR_ASSERT(status == GRPC_STATUS_OK); - } else { - GPR_ASSERT(was_cancelled == 1); - GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); - GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details), - "Received message larger than max")); - } + GPR_ASSERT(was_cancelled == 1); + GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); + GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details), + "Received message larger than max")); grpc_slice_unref(details); grpc_slice_unref(request_payload_slice); grpc_metadata_array_destroy(&initial_metadata_recv); @@ -644,11 +623,9 @@ static void test_max_receive_message_length_on_compressed_request( // Test receive message limit with compressed response larger than the limit. static void test_max_receive_message_length_on_compressed_response( - grpc_end2end_test_config config, bool minimal_stack) { + grpc_end2end_test_config config) { gpr_log(GPR_INFO, - "testing max receive message length on compressed response with " - "minimal_stack=%d", - minimal_stack); + "testing max receive message length on compressed response"); grpc_end2end_test_fixture f; grpc_call* c = nullptr; grpc_call* s = nullptr; @@ -669,13 +646,11 @@ static void test_max_receive_message_length_on_compressed_response( int was_cancelled = 2; // Set limit via channel args. - grpc_arg arg[2]; + grpc_arg arg[1]; arg[0] = grpc_channel_arg_integer_create( const_cast(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), 5); - arg[1] = grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_MINIMAL_STACK), minimal_stack); grpc_channel_args* client_args = - grpc_channel_args_copy_and_add(nullptr, arg, 2); + grpc_channel_args_copy_and_add(nullptr, arg, 1); f = begin_test(config, "test_max_response_message_length", client_args, nullptr); @@ -771,14 +746,9 @@ static void test_max_receive_message_length_on_compressed_response( cqv.Verify(); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); - if (minimal_stack) { - /* We do not perform message size checks for minimal stack. */ - GPR_ASSERT(status == GRPC_STATUS_OK); - } else { - GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); - GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details), - "Received message larger than max")); - } + GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED); + GPR_ASSERT(absl::StartsWith(grpc_core::StringViewFromSlice(details), + "Received message larger than max")); grpc_slice_unref(details); grpc_slice_unref(response_payload_slice); grpc_metadata_array_destroy(&initial_metadata_recv); @@ -824,10 +794,8 @@ void max_message_length(grpc_end2end_test_config config) { * with our simple proxy. */ if (strcmp(config.name, "inproc") != 0 && (config.feature_mask & FEATURE_MASK_SUPPORTS_REQUEST_PROXYING) == 0) { - test_max_receive_message_length_on_compressed_request(config, false); - test_max_receive_message_length_on_compressed_request(config, true); - test_max_receive_message_length_on_compressed_response(config, false); - test_max_receive_message_length_on_compressed_response(config, true); + test_max_receive_message_length_on_compressed_request(config); + test_max_receive_message_length_on_compressed_response(config); } } diff --git a/test/core/end2end/tests/streaming_error_response.cc b/test/core/end2end/tests/streaming_error_response.cc index 562041a7699..d42b5aa34d0 100644 --- a/test/core/end2end/tests/streaming_error_response.cc +++ b/test/core/end2end/tests/streaming_error_response.cc @@ -83,8 +83,8 @@ static void shutdown_client(grpc_end2end_test_fixture* f) { } static void end_test(grpc_end2end_test_fixture* f) { - shutdown_server(f); shutdown_client(f); + shutdown_server(f); grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index a518f852dd4..cb3627a2c6d 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -35,7 +35,7 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/ext/filters/http/client/http_client_filter.h" -#include "src/core/ext/filters/http/message_compress/message_compress_filter.h" +#include "src/core/ext/filters/http/message_compress/compression_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/ext/filters/message_size/message_size_filter.h" #include "src/core/lib/channel/channel_args.h" @@ -581,9 +581,10 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, PhonyFilter, SendEmptyMetadata); typedef Fixture<&grpc_core::ClientChannel::kFilterVtable, 0> ClientChannelFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientChannelFilter, NoOp); -typedef Fixture<&grpc_message_compress_filter, CHECKS_NOT_LAST> CompressFilter; -BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, NoOp); -BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, SendEmptyMetadata); +typedef Fixture<&grpc_core::ClientCompressionFilter::kFilter, CHECKS_NOT_LAST> + ClientCompressFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientCompressFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientCompressFilter, SendEmptyMetadata); typedef Fixture<&grpc_client_deadline_filter, CHECKS_NOT_LAST> ClientDeadlineFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientDeadlineFilter, NoOp); @@ -601,9 +602,10 @@ typedef Fixture<&grpc_core::HttpServerFilter::kFilter, CHECKS_NOT_LAST> HttpServerFilter; BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, NoOp); BENCHMARK_TEMPLATE(BM_IsolatedFilter, HttpServerFilter, SendEmptyMetadata); -typedef Fixture<&grpc_message_size_filter, CHECKS_NOT_LAST> MessageSizeFilter; -BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, NoOp); -BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata); +typedef Fixture<&grpc_core::ServerCompressionFilter::kFilter, CHECKS_NOT_LAST> + ServerCompressFilter; +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerCompressFilter, NoOp); +BENCHMARK_TEMPLATE(BM_IsolatedFilter, ServerCompressFilter, SendEmptyMetadata); // This cmake target is disabled for now because it depends on OpenCensus, which // is Bazel-only. // typedef Fixture<&grpc_server_load_reporting_filter, CHECKS_NOT_LAST> diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 6c6d9bf610e..c8f209cc20c 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1172,10 +1172,8 @@ src/core/ext/filters/http/client/http_client_filter.h \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ -src/core/ext/filters/http/message_compress/message_compress_filter.cc \ -src/core/ext/filters/http/message_compress/message_compress_filter.h \ -src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ -src/core/ext/filters/http/message_compress/message_decompress_filter.h \ +src/core/ext/filters/http/message_compress/compression_filter.cc \ +src/core/ext/filters/http/message_compress/compression_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/message_size/message_size_filter.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 76345cf6f53..359f6fe1910 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -989,10 +989,8 @@ src/core/ext/filters/http/client/http_client_filter.h \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ -src/core/ext/filters/http/message_compress/message_compress_filter.cc \ -src/core/ext/filters/http/message_compress/message_compress_filter.h \ -src/core/ext/filters/http/message_compress/message_decompress_filter.cc \ -src/core/ext/filters/http/message_compress/message_decompress_filter.h \ +src/core/ext/filters/http/message_compress/compression_filter.cc \ +src/core/ext/filters/http/message_compress/compression_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/message_size/message_size_filter.cc \ diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py index 213dc3363f9..647547a0c6d 100755 --- a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py +++ b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py @@ -178,16 +178,18 @@ def diff(bms, loops, regex, track, old, new): js_old_opt = _read_json( '%s.%s.opt.%s.%d.json' % (bm, stripped_line, old, loop), badjson_files, nonexistant_files) - for row in bm_json.expand_json(js_new_opt): - name = row['cpp_name'] - if name.endswith('_mean') or name.endswith('_stddev'): - continue - benchmarks[name].add_sample(track, row, True) - for row in bm_json.expand_json(js_old_opt): - name = row['cpp_name'] - if name.endswith('_mean') or name.endswith('_stddev'): - continue - benchmarks[name].add_sample(track, row, False) + if js_new_opt: + for row in bm_json.expand_json(js_new_opt): + name = row['cpp_name'] + if name.endswith('_mean') or name.endswith('_stddev'): + continue + benchmarks[name].add_sample(track, row, True) + if js_old_opt: + for row in bm_json.expand_json(js_old_opt): + name = row['cpp_name'] + if name.endswith('_mean') or name.endswith('_stddev'): + continue + benchmarks[name].add_sample(track, row, False) really_interesting = set() for name, bm in benchmarks.items(): diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index ea50aab41eb..789ec3fdc5d 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -209,30 +209,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "minimal_stack_is_minimal_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": false - }, { "args": [], "benchmark": false, @@ -4695,6 +4671,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "minimal_stack_is_minimal_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,