diff --git a/BUILD b/BUILD index d07866b8b9c..4c668a9e393 100644 --- a/BUILD +++ b/BUILD @@ -3590,11 +3590,13 @@ grpc_cc_library( "//src/core:ext/filters/http/client/http_client_filter.cc", "//src/core:ext/filters/http/http_filters_plugin.cc", "//src/core:ext/filters/http/message_compress/compression_filter.cc", + "//src/core:ext/filters/http/message_compress/legacy_compression_filter.cc", "//src/core:ext/filters/http/server/http_server_filter.cc", ], hdrs = [ "//src/core:ext/filters/http/client/http_client_filter.h", "//src/core:ext/filters/http/message_compress/compression_filter.h", + "//src/core:ext/filters/http/message_compress/legacy_compression_filter.h", "//src/core:ext/filters/http/server/http_server_filter.h", ], external_deps = [ @@ -3623,6 +3625,7 @@ grpc_cc_library( "//src/core:channel_fwd", "//src/core:channel_stack_type", "//src/core:context", + "//src/core:experiments", "//src/core:grpc_message_size_filter", "//src/core:latch", "//src/core:map", diff --git a/CMakeLists.txt b/CMakeLists.txt index 9aea587d790..90f0b5d3a4a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1826,6 +1826,7 @@ add_library(grpc src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc src/core/ext/filters/http/message_compress/compression_filter.cc + src/core/ext/filters/http/message_compress/legacy_compression_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/filters/rbac/rbac_filter.cc @@ -2882,6 +2883,7 @@ add_library(grpc_unsecure src/core/ext/filters/http/client_authority_filter.cc src/core/ext/filters/http/http_filters_plugin.cc src/core/ext/filters/http/message_compress/compression_filter.cc + src/core/ext/filters/http/message_compress/legacy_compression_filter.cc src/core/ext/filters/http/server/http_server_filter.cc src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/transport/chttp2/client/chttp2_connector.cc diff --git a/Makefile b/Makefile index 372949e7b47..30eb03c8ef2 100644 --- a/Makefile +++ b/Makefile @@ -1028,6 +1028,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/compression_filter.cc \ + src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/rbac/rbac_filter.cc \ @@ -1934,6 +1935,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/compression_filter.cc \ + src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/transport/chttp2/client/chttp2_connector.cc \ diff --git a/Package.swift b/Package.swift index a692977ac4e..512f70b00df 100644 --- a/Package.swift +++ b/Package.swift @@ -249,6 +249,8 @@ let package = Package( "src/core/ext/filters/http/http_filters_plugin.cc", "src/core/ext/filters/http/message_compress/compression_filter.cc", "src/core/ext/filters/http/message_compress/compression_filter.h", + "src/core/ext/filters/http/message_compress/legacy_compression_filter.cc", + "src/core/ext/filters/http/message_compress/legacy_compression_filter.h", "src/core/ext/filters/http/server/http_server_filter.cc", "src/core/ext/filters/http/server/http_server_filter.h", "src/core/ext/filters/message_size/message_size_filter.cc", diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index a1b5860a9c6..1b63ca0a1b2 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -51,6 +51,7 @@ EXPERIMENT_ENABLES = { "trace_record_callops": "trace_record_callops", "unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size", "v3_channel_idle_filters": "v3_channel_idle_filters", + "v3_compression_filter": "v3_compression_filter", "work_serializer_clears_time_cache": "work_serializer_clears_time_cache", "work_serializer_dispatch": "work_serializer_dispatch", "write_size_policy": "write_size_policy", @@ -66,6 +67,9 @@ EXPERIMENTS = { "bad_client_test": [ "rfc_max_concurrent_streams", ], + "compression_test": [ + "v3_compression_filter", + ], "core_end2end_test": [ "promise_based_client_call", "promise_based_server_call", @@ -145,6 +149,9 @@ EXPERIMENTS = { "bad_client_test": [ "rfc_max_concurrent_streams", ], + "compression_test": [ + "v3_compression_filter", + ], "core_end2end_test": [ "promise_based_client_call", "promise_based_server_call", @@ -221,6 +228,9 @@ EXPERIMENTS = { "cancel_ares_query_test": [ "event_engine_dns", ], + "compression_test": [ + "v3_compression_filter", + ], "core_end2end_test": [ "event_engine_client", "promise_based_client_call", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 4789b3f5a46..1aeb1ad9026 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -276,6 +276,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - src/core/ext/filters/http/message_compress/compression_filter.h + - src/core/ext/filters/http/message_compress/legacy_compression_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/message_size/message_size_filter.h - src/core/ext/filters/rbac/rbac_filter.h @@ -1280,6 +1281,7 @@ libs: - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/compression_filter.cc + - src/core/ext/filters/http/message_compress/legacy_compression_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc - src/core/ext/filters/rbac/rbac_filter.cc @@ -2210,6 +2212,7 @@ libs: - src/core/ext/filters/http/client/http_client_filter.h - src/core/ext/filters/http/client_authority_filter.h - src/core/ext/filters/http/message_compress/compression_filter.h + - src/core/ext/filters/http/message_compress/legacy_compression_filter.h - src/core/ext/filters/http/server/http_server_filter.h - src/core/ext/filters/message_size/message_size_filter.h - src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -2689,6 +2692,7 @@ libs: - src/core/ext/filters/http/client_authority_filter.cc - src/core/ext/filters/http/http_filters_plugin.cc - src/core/ext/filters/http/message_compress/compression_filter.cc + - src/core/ext/filters/http/message_compress/legacy_compression_filter.cc - src/core/ext/filters/http/server/http_server_filter.cc - src/core/ext/filters/message_size/message_size_filter.cc - src/core/ext/transport/chttp2/client/chttp2_connector.cc diff --git a/config.m4 b/config.m4 index 0b2e030045f..146fbf7251c 100644 --- a/config.m4 +++ b/config.m4 @@ -115,6 +115,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/http/client_authority_filter.cc \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/compression_filter.cc \ + src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/rbac/rbac_filter.cc \ diff --git a/config.w32 b/config.w32 index 1455ff6cc05..6fd1c3c50dd 100644 --- a/config.w32 +++ b/config.w32 @@ -80,6 +80,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\http\\client_authority_filter.cc " + "src\\core\\ext\\filters\\http\\http_filters_plugin.cc " + "src\\core\\ext\\filters\\http\\message_compress\\compression_filter.cc " + + "src\\core\\ext\\filters\\http\\message_compress\\legacy_compression_filter.cc " + "src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " + "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " + "src\\core\\ext\\filters\\rbac\\rbac_filter.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 9364a57164e..5bc1877ac30 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -309,6 +309,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/compression_filter.h', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', 'src/core/ext/filters/rbac/rbac_filter.h', @@ -1562,6 +1563,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/compression_filter.h', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', 'src/core/ext/filters/rbac/rbac_filter.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 0076a1b6311..b33c007214a 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -352,6 +352,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/compression_filter.cc', 'src/core/ext/filters/http/message_compress/compression_filter.h', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.cc', @@ -2346,6 +2348,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/http/client/http_client_filter.h', 'src/core/ext/filters/http/client_authority_filter.h', 'src/core/ext/filters/http/message_compress/compression_filter.h', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.h', 'src/core/ext/filters/http/server/http_server_filter.h', 'src/core/ext/filters/message_size/message_size_filter.h', 'src/core/ext/filters/rbac/rbac_filter.h', diff --git a/grpc.gemspec b/grpc.gemspec index e5962295a73..e18172ead01 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -255,6 +255,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc ) s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.cc ) s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.h ) + s.files += %w( src/core/ext/filters/http/message_compress/legacy_compression_filter.cc ) + s.files += %w( src/core/ext/filters/http/message_compress/legacy_compression_filter.h ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc ) s.files += %w( src/core/ext/filters/http/server/http_server_filter.h ) s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc ) diff --git a/grpc.gyp b/grpc.gyp index e68df91099f..fd09bd55d1e 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -348,6 +348,7 @@ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/compression_filter.cc', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/rbac/rbac_filter.cc', @@ -1196,6 +1197,7 @@ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/compression_filter.cc', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/transport/chttp2/client/chttp2_connector.cc', diff --git a/package.xml b/package.xml index 3a2b2beb84e..f5724cbc512 100644 --- a/package.xml +++ b/package.xml @@ -237,6 +237,8 @@ + + diff --git a/src/core/ext/filters/http/http_filters_plugin.cc b/src/core/ext/filters/http/http_filters_plugin.cc index 04fb7f9707d..b8099c45c2f 100644 --- a/src/core/ext/filters/http/http_filters_plugin.cc +++ b/src/core/ext/filters/http/http_filters_plugin.cc @@ -22,10 +22,12 @@ #include "src/core/ext/filters/http/client/http_client_filter.h" #include "src/core/ext/filters/http/message_compress/compression_filter.h" +#include "src/core/ext/filters/http/message_compress/legacy_compression_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/ext/filters/message_size/message_size_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/transport/transport.h" @@ -38,20 +40,38 @@ bool IsBuildingHttpLikeTransport(const ChannelArgs& args) { } // namespace void RegisterHttpFilters(CoreConfiguration::Builder* builder) { - builder->channel_init() - ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, - &ClientCompressionFilter::kFilter) - .If(IsBuildingHttpLikeTransport) - .After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter}); - builder->channel_init() - ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, - &ClientCompressionFilter::kFilter) - .If(IsBuildingHttpLikeTransport) - .After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter}); - builder->channel_init() - ->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter) - .If(IsBuildingHttpLikeTransport) - .After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter}); + if (IsV3CompressionFilterEnabled()) { + builder->channel_init() + ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, + &ClientCompressionFilter::kFilter) + .If(IsBuildingHttpLikeTransport) + .After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter}); + builder->channel_init() + ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, + &ClientCompressionFilter::kFilter) + .If(IsBuildingHttpLikeTransport) + .After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter}); + builder->channel_init() + ->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter) + .If(IsBuildingHttpLikeTransport) + .After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter}); + } else { + builder->channel_init() + ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, + &LegacyClientCompressionFilter::kFilter) + .If(IsBuildingHttpLikeTransport) + .After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter}); + builder->channel_init() + ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, + &LegacyClientCompressionFilter::kFilter) + .If(IsBuildingHttpLikeTransport) + .After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter}); + builder->channel_init() + ->RegisterFilter(GRPC_SERVER_CHANNEL, + &LegacyServerCompressionFilter::kFilter) + .If(IsBuildingHttpLikeTransport) + .After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter}); + } builder->channel_init() ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter) .If(IsBuildingHttpLikeTransport) diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc index b3c8d521cc9..1ffee085230 100644 --- a/src/core/ext/filters/http/message_compress/compression_filter.cc +++ b/src/core/ext/filters/http/message_compress/compression_filter.cc @@ -56,6 +56,11 @@ namespace grpc_core { +const NoInterceptor ServerCompressionFilter::Call::OnServerTrailingMetadata; +const NoInterceptor ServerCompressionFilter::Call::OnFinalize; +const NoInterceptor ClientCompressionFilter::Call::OnServerTrailingMetadata; +const NoInterceptor ClientCompressionFilter::Call::OnFinalize; + const grpc_channel_filter ClientCompressionFilter::kFilter = MakePromiseBasedFilter ServerCompressionFilter::Create( return ServerCompressionFilter(args); } -CompressionFilter::CompressionFilter(const ChannelArgs& args) +ChannelCompression::ChannelCompression(const ChannelArgs& args) : max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)), message_size_service_config_parser_index_( MessageSizeParser::ParserIndex()), @@ -105,7 +110,7 @@ CompressionFilter::CompressionFilter(const ChannelArgs& args) } } -MessageHandle CompressionFilter::CompressMessage( +MessageHandle ChannelCompression::CompressMessage( MessageHandle message, grpc_compression_algorithm algorithm) const { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d", @@ -163,7 +168,7 @@ MessageHandle CompressionFilter::CompressMessage( return message; } -absl::StatusOr CompressionFilter::DecompressMessage( +absl::StatusOr ChannelCompression::DecompressMessage( MessageHandle message, DecompressArgs args) const { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d", @@ -208,7 +213,7 @@ absl::StatusOr CompressionFilter::DecompressMessage( return std::move(message); } -grpc_compression_algorithm CompressionFilter::HandleOutgoingMetadata( +grpc_compression_algorithm ChannelCompression::HandleOutgoingMetadata( grpc_metadata_batch& outgoing_metadata) { const auto algorithm = outgoing_metadata.Take(GrpcInternalEncodingRequest()) .value_or(default_compression_algorithm()); @@ -221,7 +226,7 @@ grpc_compression_algorithm CompressionFilter::HandleOutgoingMetadata( return algorithm; } -CompressionFilter::DecompressArgs CompressionFilter::HandleIncomingMetadata( +ChannelCompression::DecompressArgs ChannelCompression::HandleIncomingMetadata( const grpc_metadata_batch& incoming_metadata) { // Configure max receive size. auto max_recv_message_length = max_recv_size_; @@ -232,89 +237,59 @@ CompressionFilter::DecompressArgs CompressionFilter::HandleIncomingMetadata( if (limits != nullptr && limits->max_recv_size().has_value() && (!max_recv_message_length.has_value() || *limits->max_recv_size() < *max_recv_message_length)) { - max_recv_message_length = *limits->max_recv_size(); + max_recv_message_length = limits->max_recv_size(); } return DecompressArgs{incoming_metadata.get(GrpcEncodingMetadata()) .value_or(GRPC_COMPRESS_NONE), max_recv_message_length}; } -ArenaPromise ClientCompressionFilter::MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) { - auto compression_algorithm = - HandleOutgoingMetadata(*call_args.client_initial_metadata); - call_args.client_to_server_messages->InterceptAndMap( - [compression_algorithm, - this](MessageHandle message) -> absl::optional { - return CompressMessage(std::move(message), compression_algorithm); - }); - auto* decompress_args = GetContext()->New( - DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt}); - auto* decompress_err = - GetContext()->New>(); - call_args.server_initial_metadata->InterceptAndMap( - [decompress_args, this](ServerMetadataHandle server_initial_metadata) - -> absl::optional { - if (server_initial_metadata == nullptr) return absl::nullopt; - *decompress_args = HandleIncomingMetadata(*server_initial_metadata); - return std::move(server_initial_metadata); - }); - call_args.server_to_client_messages->InterceptAndMap( - [decompress_err, decompress_args, - this](MessageHandle message) -> absl::optional { - auto r = DecompressMessage(std::move(message), *decompress_args); - if (!r.ok()) { - decompress_err->Set(ServerMetadataFromStatus(r.status())); - return absl::nullopt; - } - return std::move(*r); - }); - // Run the next filter, and race it with getting an error from decompression. - return PrioritizedRace(decompress_err->Wait(), - next_promise_factory(std::move(call_args))); +void ClientCompressionFilter::Call::OnClientInitialMetadata( + ClientMetadata& md, ClientCompressionFilter* filter) { + compression_algorithm_ = + filter->compression_engine_.HandleOutgoingMetadata(md); +} + +MessageHandle ClientCompressionFilter::Call::OnClientToServerMessage( + MessageHandle message, ClientCompressionFilter* filter) { + return filter->compression_engine_.CompressMessage(std::move(message), + compression_algorithm_); +} + +void ClientCompressionFilter::Call::OnServerInitialMetadata( + ServerMetadata& md, ClientCompressionFilter* filter) { + decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md); +} + +absl::StatusOr +ClientCompressionFilter::Call::OnServerToClientMessage( + MessageHandle message, ClientCompressionFilter* filter) { + return filter->compression_engine_.DecompressMessage(std::move(message), + decompress_args_); +} + +void ServerCompressionFilter::Call::OnClientInitialMetadata( + ClientMetadata& md, ServerCompressionFilter* filter) { + decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md); +} + +absl::StatusOr +ServerCompressionFilter::Call::OnClientToServerMessage( + MessageHandle message, ServerCompressionFilter* filter) { + return filter->compression_engine_.DecompressMessage(std::move(message), + decompress_args_); +} + +void ServerCompressionFilter::Call::OnServerInitialMetadata( + ServerMetadata& md, ServerCompressionFilter* filter) { + compression_algorithm_ = + filter->compression_engine_.HandleOutgoingMetadata(md); } -ArenaPromise ServerCompressionFilter::MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) { - auto decompress_args = - HandleIncomingMetadata(*call_args.client_initial_metadata); - auto* decompress_err = - GetContext()->New>(); - call_args.client_to_server_messages->InterceptAndMap( - [decompress_err, decompress_args, - this](MessageHandle message) -> absl::optional { - auto r = DecompressMessage(std::move(message), decompress_args); - if (grpc_call_trace.enabled()) { - gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s", - Activity::current()->DebugTag().c_str(), - r.status().ToString().c_str()); - } - if (!r.ok()) { - decompress_err->Set(ServerMetadataFromStatus(r.status())); - return absl::nullopt; - } - return std::move(*r); - }); - auto* compression_algorithm = - GetContext()->New(); - call_args.server_initial_metadata->InterceptAndMap( - [this, compression_algorithm](ServerMetadataHandle md) { - if (grpc_call_trace.enabled()) { - gpr_log(GPR_INFO, "%s[compression] Write metadata", - Activity::current()->DebugTag().c_str()); - } - // Find the compression algorithm. - *compression_algorithm = HandleOutgoingMetadata(*md); - return md; - }); - call_args.server_to_client_messages->InterceptAndMap( - [compression_algorithm, - this](MessageHandle message) -> absl::optional { - return CompressMessage(std::move(message), *compression_algorithm); - }); - // Run the next filter, and race it with getting an error from decompression. - return PrioritizedRace(decompress_err->Wait(), - next_promise_factory(std::move(call_args))); +MessageHandle ServerCompressionFilter::Call::OnServerToClientMessage( + MessageHandle message, ServerCompressionFilter* filter) { + return filter->compression_engine_.CompressMessage(std::move(message), + compression_algorithm_); } } // namespace grpc_core diff --git a/src/core/ext/filters/http/message_compress/compression_filter.h b/src/core/ext/filters/http/message_compress/compression_filter.h index caf49c03336..40e88d9b5a8 100644 --- a/src/core/ext/filters/http/message_compress/compression_filter.h +++ b/src/core/ext/filters/http/message_compress/compression_filter.h @@ -61,15 +61,15 @@ namespace grpc_core { /// the aforementioned 'grpc-encoding' metadata value, data will pass through /// uncompressed. -class CompressionFilter : public ChannelFilter { - protected: +class ChannelCompression { + public: + explicit ChannelCompression(const ChannelArgs& args); + struct DecompressArgs { grpc_compression_algorithm algorithm; absl::optional max_recv_message_length; }; - explicit CompressionFilter(const ChannelArgs& args); - grpc_compression_algorithm default_compression_algorithm() const { return default_compression_algorithm_; } @@ -104,7 +104,8 @@ class CompressionFilter : public ChannelFilter { bool enable_decompression_; }; -class ClientCompressionFilter final : public CompressionFilter { +class ClientCompressionFilter final + : public ImplementChannelFilter { public: static const grpc_channel_filter kFilter; @@ -112,14 +113,35 @@ class ClientCompressionFilter final : public CompressionFilter { const ChannelArgs& args, ChannelFilter::Args filter_args); // Construct a promise for one call. - ArenaPromise MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) override; + class Call { + public: + void OnClientInitialMetadata(ClientMetadata& md, + ClientCompressionFilter* filter); + MessageHandle OnClientToServerMessage(MessageHandle message, + ClientCompressionFilter* filter); + + void OnServerInitialMetadata(ServerMetadata& md, + ClientCompressionFilter* filter); + absl::StatusOr OnServerToClientMessage( + MessageHandle message, ClientCompressionFilter* filter); + + static const NoInterceptor OnServerTrailingMetadata; + static const NoInterceptor OnFinalize; + + private: + grpc_compression_algorithm compression_algorithm_; + ChannelCompression::DecompressArgs decompress_args_; + }; private: - using CompressionFilter::CompressionFilter; + explicit ClientCompressionFilter(const ChannelArgs& args) + : compression_engine_(args) {} + + ChannelCompression compression_engine_; }; -class ServerCompressionFilter final : public CompressionFilter { +class ServerCompressionFilter final + : public ImplementChannelFilter { public: static const grpc_channel_filter kFilter; @@ -127,11 +149,31 @@ class ServerCompressionFilter final : public CompressionFilter { const ChannelArgs& args, ChannelFilter::Args filter_args); // Construct a promise for one call. - ArenaPromise MakeCallPromise( - CallArgs call_args, NextPromiseFactory next_promise_factory) override; + class Call { + public: + void OnClientInitialMetadata(ClientMetadata& md, + ServerCompressionFilter* filter); + absl::StatusOr OnClientToServerMessage( + MessageHandle message, ServerCompressionFilter* filter); + + void OnServerInitialMetadata(ServerMetadata& md, + ServerCompressionFilter* filter); + MessageHandle OnServerToClientMessage(MessageHandle message, + ServerCompressionFilter* filter); + + static const NoInterceptor OnServerTrailingMetadata; + static const NoInterceptor OnFinalize; + + private: + ChannelCompression::DecompressArgs decompress_args_; + grpc_compression_algorithm compression_algorithm_; + }; private: - using CompressionFilter::CompressionFilter; + explicit ServerCompressionFilter(const ChannelArgs& args) + : compression_engine_(args) {} + + ChannelCompression compression_engine_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/http/message_compress/legacy_compression_filter.cc b/src/core/ext/filters/http/message_compress/legacy_compression_filter.cc new file mode 100644 index 00000000000..71f9a6d9207 --- /dev/null +++ b/src/core/ext/filters/http/message_compress/legacy_compression_filter.cc @@ -0,0 +1,325 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/ext/filters/http/message_compress/legacy_compression_filter.h" + +#include + +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" +#include "absl/types/optional.h" + +#include +#include +#include +#include +#include + +#include "src/core/ext/filters/message_size/message_size_filter.h" +#include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/context.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/context.h" +#include "src/core/lib/promise/latch.h" +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/prioritized_race.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/call_trace.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport.h" + +namespace grpc_core { + +const grpc_channel_filter LegacyClientCompressionFilter::kFilter = + MakePromiseBasedFilter< + LegacyClientCompressionFilter, FilterEndpoint::kClient, + kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages | + kFilterExaminesOutboundMessages>("compression"); +const grpc_channel_filter LegacyServerCompressionFilter::kFilter = + MakePromiseBasedFilter< + LegacyServerCompressionFilter, FilterEndpoint::kServer, + kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages | + kFilterExaminesOutboundMessages>("compression"); + +absl::StatusOr +LegacyClientCompressionFilter::Create(const ChannelArgs& args, + ChannelFilter::Args) { + return LegacyClientCompressionFilter(args); +} + +absl::StatusOr +LegacyServerCompressionFilter::Create(const ChannelArgs& args, + ChannelFilter::Args) { + return LegacyServerCompressionFilter(args); +} + +LegacyCompressionFilter::LegacyCompressionFilter(const ChannelArgs& args) + : max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)), + message_size_service_config_parser_index_( + MessageSizeParser::ParserIndex()), + default_compression_algorithm_( + DefaultCompressionAlgorithmFromChannelArgs(args).value_or( + GRPC_COMPRESS_NONE)), + enabled_compression_algorithms_( + CompressionAlgorithmSet::FromChannelArgs(args)), + enable_compression_( + args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION).value_or(true)), + enable_decompression_( + args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION) + .value_or(true)) { + // Make sure the default is enabled. + if (!enabled_compression_algorithms_.IsSet(default_compression_algorithm_)) { + const char* name; + if (!grpc_compression_algorithm_name(default_compression_algorithm_, + &name)) { + name = ""; + } + gpr_log(GPR_ERROR, + "default compression algorithm %s not enabled: switching to none", + name); + default_compression_algorithm_ = GRPC_COMPRESS_NONE; + } +} + +MessageHandle LegacyCompressionFilter::CompressMessage( + MessageHandle message, grpc_compression_algorithm algorithm) const { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d", + message->payload()->Length(), algorithm, message->flags()); + } + auto* call_context = GetContext(); + auto* call_tracer = static_cast( + call_context[GRPC_CONTEXT_CALL_TRACER].value); + if (call_tracer != nullptr) { + call_tracer->RecordSendMessage(*message->payload()); + } + // Check if we're allowed to compress this message + // (apps might want to disable compression for certain messages to avoid + // crime/beast like vulns). + uint32_t& flags = message->mutable_flags(); + if (algorithm == GRPC_COMPRESS_NONE || !enable_compression_ || + (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS))) { + return message; + } + // Try to compress the payload. + SliceBuffer tmp; + SliceBuffer* payload = message->payload(); + bool did_compress = grpc_msg_compress(algorithm, payload->c_slice_buffer(), + tmp.c_slice_buffer()); + // If we achieved compression send it as compressed, otherwise send it as (to + // avoid spending cycles on the receiver decompressing). + if (did_compress) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + const char* algo_name; + const size_t before_size = payload->Length(); + const size_t after_size = tmp.Length(); + const float savings_ratio = 1.0f - static_cast(after_size) / + static_cast(before_size); + GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name)); + gpr_log(GPR_INFO, + "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR + " bytes (%.2f%% savings)", + algo_name, before_size, after_size, 100 * savings_ratio); + } + tmp.Swap(payload); + flags |= GRPC_WRITE_INTERNAL_COMPRESS; + if (call_tracer != nullptr) { + call_tracer->RecordSendCompressedMessage(*message->payload()); + } + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + const char* algo_name; + GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name)); + gpr_log(GPR_INFO, + "Algorithm '%s' enabled but decided not to compress. Input size: " + "%" PRIuPTR, + algo_name, payload->Length()); + } + } + return message; +} + +absl::StatusOr LegacyCompressionFilter::DecompressMessage( + MessageHandle message, DecompressArgs args) const { + if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { + gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d", + message->payload()->Length(), + args.max_recv_message_length.value_or(-1), args.algorithm); + } + auto* call_context = GetContext(); + auto* call_tracer = static_cast( + call_context[GRPC_CONTEXT_CALL_TRACER].value); + if (call_tracer != nullptr) { + call_tracer->RecordReceivedMessage(*message->payload()); + } + // Check max message length. + if (args.max_recv_message_length.has_value() && + message->payload()->Length() > + static_cast(*args.max_recv_message_length)) { + return absl::ResourceExhaustedError(absl::StrFormat( + "Received message larger than max (%u vs. %d)", + message->payload()->Length(), *args.max_recv_message_length)); + } + // Check if decompression is enabled (if not, we can just pass the message + // up). + if (!enable_decompression_ || + (message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == 0) { + return std::move(message); + } + // Try to decompress the payload. + SliceBuffer decompressed_slices; + if (grpc_msg_decompress(args.algorithm, message->payload()->c_slice_buffer(), + decompressed_slices.c_slice_buffer()) == 0) { + return absl::InternalError( + absl::StrCat("Unexpected error decompressing data for algorithm ", + CompressionAlgorithmAsString(args.algorithm))); + } + // Swap the decompressed slices into the message. + message->payload()->Swap(&decompressed_slices); + message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS; + message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; + if (call_tracer != nullptr) { + call_tracer->RecordReceivedDecompressedMessage(*message->payload()); + } + return std::move(message); +} + +grpc_compression_algorithm LegacyCompressionFilter::HandleOutgoingMetadata( + grpc_metadata_batch& outgoing_metadata) { + const auto algorithm = outgoing_metadata.Take(GrpcInternalEncodingRequest()) + .value_or(default_compression_algorithm()); + // Convey supported compression algorithms. + outgoing_metadata.Set(GrpcAcceptEncodingMetadata(), + enabled_compression_algorithms()); + if (algorithm != GRPC_COMPRESS_NONE) { + outgoing_metadata.Set(GrpcEncodingMetadata(), algorithm); + } + return algorithm; +} + +LegacyCompressionFilter::DecompressArgs +LegacyCompressionFilter::HandleIncomingMetadata( + const grpc_metadata_batch& incoming_metadata) { + // Configure max receive size. + auto max_recv_message_length = max_recv_size_; + const MessageSizeParsedConfig* limits = + MessageSizeParsedConfig::GetFromCallContext( + GetContext(), + message_size_service_config_parser_index_); + if (limits != nullptr && limits->max_recv_size().has_value() && + (!max_recv_message_length.has_value() || + *limits->max_recv_size() < *max_recv_message_length)) { + max_recv_message_length = *limits->max_recv_size(); + } + return DecompressArgs{incoming_metadata.get(GrpcEncodingMetadata()) + .value_or(GRPC_COMPRESS_NONE), + max_recv_message_length}; +} + +ArenaPromise +LegacyClientCompressionFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + auto compression_algorithm = + HandleOutgoingMetadata(*call_args.client_initial_metadata); + call_args.client_to_server_messages->InterceptAndMap( + [compression_algorithm, + this](MessageHandle message) -> absl::optional { + return CompressMessage(std::move(message), compression_algorithm); + }); + auto* decompress_args = GetContext()->New( + DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt}); + auto* decompress_err = + GetContext()->New>(); + call_args.server_initial_metadata->InterceptAndMap( + [decompress_args, this](ServerMetadataHandle server_initial_metadata) + -> absl::optional { + if (server_initial_metadata == nullptr) return absl::nullopt; + *decompress_args = HandleIncomingMetadata(*server_initial_metadata); + return std::move(server_initial_metadata); + }); + call_args.server_to_client_messages->InterceptAndMap( + [decompress_err, decompress_args, + this](MessageHandle message) -> absl::optional { + auto r = DecompressMessage(std::move(message), *decompress_args); + if (!r.ok()) { + decompress_err->Set(ServerMetadataFromStatus(r.status())); + return absl::nullopt; + } + return std::move(*r); + }); + // Run the next filter, and race it with getting an error from decompression. + return PrioritizedRace(decompress_err->Wait(), + next_promise_factory(std::move(call_args))); +} + +ArenaPromise +LegacyServerCompressionFilter::MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) { + auto decompress_args = + HandleIncomingMetadata(*call_args.client_initial_metadata); + auto* decompress_err = + GetContext()->New>(); + call_args.client_to_server_messages->InterceptAndMap( + [decompress_err, decompress_args, + this](MessageHandle message) -> absl::optional { + auto r = DecompressMessage(std::move(message), decompress_args); + if (grpc_call_trace.enabled()) { + gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s", + Activity::current()->DebugTag().c_str(), + r.status().ToString().c_str()); + } + if (!r.ok()) { + decompress_err->Set(ServerMetadataFromStatus(r.status())); + return absl::nullopt; + } + return std::move(*r); + }); + auto* compression_algorithm = + GetContext()->New(); + call_args.server_initial_metadata->InterceptAndMap( + [this, compression_algorithm](ServerMetadataHandle md) { + if (grpc_call_trace.enabled()) { + gpr_log(GPR_INFO, "%s[compression] Write metadata", + Activity::current()->DebugTag().c_str()); + } + // Find the compression algorithm. + *compression_algorithm = HandleOutgoingMetadata(*md); + return md; + }); + call_args.server_to_client_messages->InterceptAndMap( + [compression_algorithm, + this](MessageHandle message) -> absl::optional { + return CompressMessage(std::move(message), *compression_algorithm); + }); + // Run the next filter, and race it with getting an error from decompression. + return PrioritizedRace(decompress_err->Wait(), + next_promise_factory(std::move(call_args))); +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/http/message_compress/legacy_compression_filter.h b/src/core/ext/filters/http/message_compress/legacy_compression_filter.h new file mode 100644 index 00000000000..0926bc09ed9 --- /dev/null +++ b/src/core/ext/filters/http/message_compress/legacy_compression_filter.h @@ -0,0 +1,139 @@ +// +// +// Copyright 2020 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H +#define GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H + +#include + +#include +#include + +#include "absl/status/statusor.h" +#include "absl/types/optional.h" + +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_fwd.h" +#include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/compression/compression_internal.h" +#include "src/core/lib/promise/arena_promise.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport.h" + +namespace grpc_core { + +/// Compression filter for messages. +/// +/// See for the available compression settings. +/// +/// Compression settings may come from: +/// - Channel configuration, as established at channel creation time. +/// - The metadata accompanying the outgoing data to be compressed. This is +/// taken as a request only. We may choose not to honor it. The metadata key +/// is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY. +/// +/// Compression can be disabled for concrete messages (for instance in order to +/// prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set +/// in the MessageHandle flags. +/// +/// The attempted compression mechanism is added to the resulting initial +/// metadata under the 'grpc-encoding' key. +/// +/// If compression is actually performed, the MessageHandle's flag is modified +/// to incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of +/// the aforementioned 'grpc-encoding' metadata value, data will pass through +/// uncompressed. + +class LegacyCompressionFilter : public ChannelFilter { + protected: + struct DecompressArgs { + grpc_compression_algorithm algorithm; + absl::optional max_recv_message_length; + }; + + explicit LegacyCompressionFilter(const ChannelArgs& args); + + grpc_compression_algorithm default_compression_algorithm() const { + return default_compression_algorithm_; + } + + CompressionAlgorithmSet enabled_compression_algorithms() const { + return enabled_compression_algorithms_; + } + + grpc_compression_algorithm HandleOutgoingMetadata( + grpc_metadata_batch& outgoing_metadata); + DecompressArgs HandleIncomingMetadata( + const grpc_metadata_batch& incoming_metadata); + + // Compress one message synchronously. + MessageHandle CompressMessage(MessageHandle message, + grpc_compression_algorithm algorithm) const; + // Decompress one message synchronously. + absl::StatusOr DecompressMessage(MessageHandle message, + DecompressArgs args) const; + + private: + // Max receive message length, if set. + absl::optional max_recv_size_; + size_t message_size_service_config_parser_index_; + // The default, channel-level, compression algorithm. + grpc_compression_algorithm default_compression_algorithm_; + // Enabled compression algorithms. + CompressionAlgorithmSet enabled_compression_algorithms_; + // Is compression enabled? + bool enable_compression_; + // Is decompression enabled? + bool enable_decompression_; +}; + +class LegacyClientCompressionFilter final : public LegacyCompressionFilter { + public: + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + const ChannelArgs& args, ChannelFilter::Args filter_args); + + // Construct a promise for one call. + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; + + private: + using LegacyCompressionFilter::LegacyCompressionFilter; +}; + +class LegacyServerCompressionFilter final : public LegacyCompressionFilter { + public: + static const grpc_channel_filter kFilter; + + static absl::StatusOr Create( + const ChannelArgs& args, ChannelFilter::Args filter_args); + + // Construct a promise for one call. + ArenaPromise MakeCallPromise( + CallArgs call_args, NextPromiseFactory next_promise_factory) override; + + private: + using LegacyCompressionFilter::LegacyCompressionFilter; +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index ad33f366378..b455ab869b9 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -139,11 +139,21 @@ inline constexpr bool HasAsyncErrorInterceptor(const NoInterceptor*) { return false; } +template +inline constexpr bool HasAsyncErrorInterceptor(A0 (T::*)(A0, As...)) { + return false; +} + template inline constexpr bool HasAsyncErrorInterceptor(absl::Status (T::*)(A...)) { return true; } +template +inline constexpr bool HasAsyncErrorInterceptor(absl::StatusOr (T::*)(A...)) { + return true; +} + template inline constexpr bool HasAsyncErrorInterceptor( ServerMetadataHandle (T::*)(A...)) { @@ -404,6 +414,34 @@ inline void InterceptClientToServerMessage( }); } +template +inline void InterceptClientToServerMessage( + MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*), + FilterCallData* call_data, const CallArgs& call_args) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); + call_args.client_to_server_messages->InterceptAndMap( + [call_data](MessageHandle msg) -> absl::optional { + return call_data->call.OnClientToServerMessage(std::move(msg), + call_data->channel); + }); +} + +template +inline void InterceptClientToServerMessage( + absl::StatusOr (Derived::Call::*fn)(MessageHandle, Derived*), + FilterCallData* call_data, const CallArgs& call_args) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); + call_args.client_to_server_messages->InterceptAndMap( + [call_data](MessageHandle msg) -> absl::optional { + auto r = call_data->call.OnClientToServerMessage(std::move(msg), + call_data->channel); + if (r.ok()) return std::move(*r); + if (call_data->error_latch.is_set()) return absl::nullopt; + call_data->error_latch.Set(ServerMetadataFromStatus(r.status())); + return absl::nullopt; + }); +} + inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*, CallSpineInterface*) {} @@ -412,7 +450,7 @@ inline void InterceptClientToServerMessage( ServerMetadataHandle (Derived::Call::*fn)(const Message&), typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); - call_spine->server_to_client_messages().sender.InterceptAndMap( + call_spine->client_to_server_messages().receiver.InterceptAndMap( [call, call_spine](MessageHandle msg) -> absl::optional { auto return_md = call->OnClientToServerMessage(*msg); if (return_md == nullptr) return std::move(msg); @@ -426,7 +464,7 @@ inline void InterceptClientToServerMessage( typename Derived::Call* call, Derived* channel, CallSpineInterface* call_spine) { GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); - call_spine->server_to_client_messages().sender.InterceptAndMap( + call_spine->client_to_server_messages().receiver.InterceptAndMap( [call, call_spine, channel](MessageHandle msg) -> absl::optional { auto return_md = call->OnClientToServerMessage(*msg, channel); @@ -435,6 +473,33 @@ inline void InterceptClientToServerMessage( }); } +template +inline void InterceptClientToServerMessage( + MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); + call_spine->client_to_server_messages().receiver.InterceptAndMap( + [call, channel](MessageHandle msg) { + return call->OnClientToServerMessage(std::move(msg), channel); + }); +} + +template +inline void InterceptClientToServerMessage( + absl::StatusOr (Derived::Call::*fn)(MessageHandle, Derived*), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage); + call_spine->client_to_server_messages().receiver.InterceptAndMap( + [call, call_spine, + channel](MessageHandle msg) -> absl::optional { + auto r = call->OnClientToServerMessage(std::move(msg), channel); + if (r.ok()) return std::move(*r); + return call_spine->Cancel(ServerMetadataFromStatus(r.status())); + }); +} + inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*, CallSpineInterface*) {} @@ -526,6 +591,36 @@ inline void InterceptServerInitialMetadata( }); } +template +inline void InterceptServerInitialMetadata( + void (Derived::Call::*fn)(ServerMetadata&, Derived*), + FilterCallData* call_data, const CallArgs& call_args) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); + call_args.server_initial_metadata->InterceptAndMap( + [call_data](ServerMetadataHandle md) { + call_data->call.OnServerInitialMetadata(*md, call_data->channel); + return md; + }); +} + +template +inline void InterceptServerInitialMetadata( + absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*), + FilterCallData* call_data, const CallArgs& call_args) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); + call_args.server_initial_metadata->InterceptAndMap( + [call_data]( + ServerMetadataHandle md) -> absl::optional { + auto status = + call_data->call.OnServerInitialMetadata(*md, call_data->channel); + if (!status.ok() && !call_data->error_latch.is_set()) { + call_data->error_latch.Set(ServerMetadataFromStatus(status)); + return absl::nullopt; + } + return std::move(md); + }); +} + inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*, CallSpineInterface*) {} @@ -555,6 +650,34 @@ inline void InterceptServerInitialMetadata( }); } +template +inline void InterceptServerInitialMetadata( + void (Derived::Call::*fn)(ServerMetadata&, Derived*), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); + call_spine->server_initial_metadata().sender.InterceptAndMap( + [call, channel](ServerMetadataHandle md) { + call->OnServerInitialMetadata(*md, channel); + return md; + }); +} + +template +inline void InterceptServerInitialMetadata( + absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata); + call_spine->server_initial_metadata().sender.InterceptAndMap( + [call, call_spine, channel]( + ServerMetadataHandle md) -> absl::optional { + auto status = call->OnServerInitialMetadata(*md, channel); + if (status.ok()) return std::move(md); + return call_spine->Cancel(ServerMetadataFromStatus(status)); + }); +} + inline void InterceptServerToClientMessage(const NoInterceptor*, void*, const CallArgs&) {} @@ -589,6 +712,34 @@ inline void InterceptServerToClientMessage( }); } +template +inline void InterceptServerToClientMessage( + MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*), + FilterCallData* call_data, const CallArgs& call_args) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); + call_args.server_to_client_messages->InterceptAndMap( + [call_data](MessageHandle msg) -> absl::optional { + return call_data->call.OnServerToClientMessage(std::move(msg), + call_data->channel); + }); +} + +template +inline void InterceptServerToClientMessage( + absl::StatusOr (Derived::Call::*fn)(MessageHandle, Derived*), + FilterCallData* call_data, const CallArgs& call_args) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); + call_args.server_to_client_messages->InterceptAndMap( + [call_data](MessageHandle msg) -> absl::optional { + auto r = call_data->call.OnServerToClientMessage(std::move(msg), + call_data->channel); + if (r.ok()) return std::move(*r); + if (call_data->error_latch.is_set()) return absl::nullopt; + call_data->error_latch.Set(ServerMetadataFromStatus(r.status())); + return absl::nullopt; + }); +} + inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*, CallSpineInterface*) {} @@ -620,6 +771,33 @@ inline void InterceptServerToClientMessage( }); } +template +inline void InterceptServerToClientMessage( + MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); + call_spine->server_to_client_messages().sender.InterceptAndMap( + [call, channel](MessageHandle msg) { + return call->OnServerToClientMessage(std::move(msg), channel); + }); +} + +template +inline void InterceptServerToClientMessage( + absl::StatusOr (Derived::Call::*fn)(MessageHandle, Derived*), + typename Derived::Call* call, Derived* channel, + CallSpineInterface* call_spine) { + GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage); + call_spine->server_to_client_messages().sender.InterceptAndMap( + [call, call_spine, + channel](MessageHandle msg) -> absl::optional { + auto r = call->OnServerToClientMessage(std::move(msg), channel); + if (r.ok()) return std::move(*r); + return call_spine->Cancel(ServerMetadataFromStatus(r.status())); + }); +} + inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*, CallSpineInterface*) {} diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index a49b5554e88..91415120585 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -155,6 +155,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size = const char* const description_v3_channel_idle_filters = "Use the v3 filter API version of the idle filters."; const char* const additional_constraints_v3_channel_idle_filters = "{}"; +const char* const description_v3_compression_filter = + "Use the compression filter utilizing the v3 filter api"; +const char* const additional_constraints_v3_compression_filter = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -269,6 +272,8 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"v3_channel_idle_filters", description_v3_channel_idle_filters, additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true}, + {"v3_compression_filter", description_v3_compression_filter, + additional_constraints_v3_compression_filter, nullptr, 0, false, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true, @@ -419,6 +424,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size = const char* const description_v3_channel_idle_filters = "Use the v3 filter API version of the idle filters."; const char* const additional_constraints_v3_channel_idle_filters = "{}"; +const char* const description_v3_compression_filter = + "Use the compression filter utilizing the v3 filter api"; +const char* const additional_constraints_v3_compression_filter = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -533,6 +541,8 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"v3_channel_idle_filters", description_v3_channel_idle_filters, additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true}, + {"v3_compression_filter", description_v3_compression_filter, + additional_constraints_v3_compression_filter, nullptr, 0, false, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true, @@ -683,6 +693,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size = const char* const description_v3_channel_idle_filters = "Use the v3 filter API version of the idle filters."; const char* const additional_constraints_v3_channel_idle_filters = "{}"; +const char* const description_v3_compression_filter = + "Use the compression filter utilizing the v3 filter api"; +const char* const additional_constraints_v3_compression_filter = "{}"; const char* const description_work_serializer_clears_time_cache = "Have the work serializer clear the time cache when it dispatches work."; const char* const additional_constraints_work_serializer_clears_time_cache = @@ -797,6 +810,8 @@ const ExperimentMetadata g_experiment_metadata[] = { false, true}, {"v3_channel_idle_filters", description_v3_channel_idle_filters, additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true}, + {"v3_compression_filter", description_v3_compression_filter, + additional_constraints_v3_compression_filter, nullptr, 0, false, true}, {"work_serializer_clears_time_cache", description_work_serializer_clears_time_cache, additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 2e680ac8ead..d337d9da2fa 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -108,6 +108,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTraceRecordCallopsEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsV3ChannelIdleFiltersEnabled() { return false; } +inline bool IsV3CompressionFilterEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } @@ -171,6 +172,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTraceRecordCallopsEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsV3ChannelIdleFiltersEnabled() { return false; } +inline bool IsV3CompressionFilterEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } @@ -234,6 +236,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; } inline bool IsTraceRecordCallopsEnabled() { return false; } inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; } inline bool IsV3ChannelIdleFiltersEnabled() { return false; } +inline bool IsV3CompressionFilterEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; } inline bool IsWorkSerializerDispatchEnabled() { return false; } @@ -281,6 +284,7 @@ enum ExperimentIds { kExperimentIdTraceRecordCallops, kExperimentIdUnconstrainedMaxQuotaBufferSize, kExperimentIdV3ChannelIdleFilters, + kExperimentIdV3CompressionFilter, kExperimentIdWorkSerializerClearsTimeCache, kExperimentIdWorkSerializerDispatch, kExperimentIdWriteSizePolicy, @@ -424,6 +428,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { inline bool IsV3ChannelIdleFiltersEnabled() { return IsExperimentEnabled(kExperimentIdV3ChannelIdleFilters); } +#define GRPC_EXPERIMENT_IS_INCLUDED_V3_COMPRESSION_FILTER +inline bool IsV3CompressionFilterEnabled() { + return IsExperimentEnabled(kExperimentIdV3CompressionFilter); +} #define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE inline bool IsWorkSerializerClearsTimeCacheEnabled() { return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index d42e14ee34a..8fd03836ea9 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -259,6 +259,12 @@ expiry: 2024/04/04 owner: ctiller@google.com test_tags: [] +- name: v3_compression_filter + description: + Use the compression filter utilizing the v3 filter api + expiry: 2024/04/04 + owner: ctiller@google.com + test_tags: ["compression_test"] - name: work_serializer_clears_time_cache description: Have the work serializer clear the time cache when it dispatches work. diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index c0439924822..9c2ba0008a5 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -89,6 +89,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/http/http_filters_plugin.cc', 'src/core/ext/filters/http/message_compress/compression_filter.cc', + 'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc', 'src/core/ext/filters/http/server/http_server_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/rbac/rbac_filter.cc', diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD index 01ebf2b9615..98cbf78d953 100644 --- a/test/core/end2end/BUILD +++ b/test/core/end2end/BUILD @@ -286,7 +286,10 @@ grpc_core_end2end_test(name = "channelz") grpc_core_end2end_test(name = "client_streaming") -grpc_core_end2end_test(name = "compressed_payload") +grpc_core_end2end_test( + name = "compressed_payload", + tags = ["compression_test"], +) grpc_core_end2end_test(name = "connectivity") diff --git a/test/core/end2end/grpc_core_end2end_test.bzl b/test/core/end2end/grpc_core_end2end_test.bzl index 16f193bc197..9b1224db84a 100644 --- a/test/core/end2end/grpc_core_end2end_test.bzl +++ b/test/core/end2end/grpc_core_end2end_test.bzl @@ -25,7 +25,7 @@ END2END_TEST_DATA = [ "//src/core/tsi/test_creds:server1.pem", ] -def grpc_core_end2end_test(name, shard_count = 10): +def grpc_core_end2end_test(name, shard_count = 10, tags = []): if len(name) > 60: fail("test name %s too long" % name) @@ -45,7 +45,7 @@ def grpc_core_end2end_test(name, shard_count = 10): "absl/types:optional", "gtest", ], - tags = ["core_end2end_test"], + tags = ["core_end2end_test"] + tags, deps = [ "end2end_test_main", "cq_verifier", diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index b465d10a271..a173512c3ce 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1215,6 +1215,8 @@ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/compression_filter.cc \ src/core/ext/filters/http/message_compress/compression_filter.h \ +src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \ +src/core/ext/filters/http/message_compress/legacy_compression_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/message_size/message_size_filter.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 7a3ea87d2ac..ae89c4707cc 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1024,6 +1024,8 @@ src/core/ext/filters/http/client_authority_filter.h \ src/core/ext/filters/http/http_filters_plugin.cc \ src/core/ext/filters/http/message_compress/compression_filter.cc \ src/core/ext/filters/http/message_compress/compression_filter.h \ +src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \ +src/core/ext/filters/http/message_compress/legacy_compression_filter.h \ src/core/ext/filters/http/server/http_server_filter.cc \ src/core/ext/filters/http/server/http_server_filter.h \ src/core/ext/filters/message_size/message_size_filter.cc \