[call-v3] Convert message compression filter (#35269)

Closes #35269

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35269 from ctiller:cg-compress 0304821f07
PiperOrigin-RevId: 590010564
pull/35278/head^2
Craig Tiller 1 year ago committed by Copybara-Service
parent a7e90045db
commit ff63ad9413
  1. 3
      BUILD
  2. 2
      CMakeLists.txt
  3. 2
      Makefile
  4. 2
      Package.swift
  5. 10
      bazel/experiments.bzl
  6. 4
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 2
      gRPC-C++.podspec
  10. 3
      gRPC-Core.podspec
  11. 2
      grpc.gemspec
  12. 2
      grpc.gyp
  13. 2
      package.xml
  14. 48
      src/core/ext/filters/http/http_filters_plugin.cc
  15. 135
      src/core/ext/filters/http/message_compress/compression_filter.cc
  16. 66
      src/core/ext/filters/http/message_compress/compression_filter.h
  17. 325
      src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
  18. 139
      src/core/ext/filters/http/message_compress/legacy_compression_filter.h
  19. 182
      src/core/lib/channel/promise_based_filter.h
  20. 15
      src/core/lib/experiments/experiments.cc
  21. 8
      src/core/lib/experiments/experiments.h
  22. 6
      src/core/lib/experiments/experiments.yaml
  23. 1
      src/python/grpcio/grpc_core_dependencies.py
  24. 5
      test/core/end2end/BUILD
  25. 4
      test/core/end2end/grpc_core_end2end_test.bzl
  26. 2
      tools/doxygen/Doxyfile.c++.internal
  27. 2
      tools/doxygen/Doxyfile.core.internal

@ -3590,11 +3590,13 @@ grpc_cc_library(
"//src/core:ext/filters/http/client/http_client_filter.cc",
"//src/core:ext/filters/http/http_filters_plugin.cc",
"//src/core:ext/filters/http/message_compress/compression_filter.cc",
"//src/core:ext/filters/http/message_compress/legacy_compression_filter.cc",
"//src/core:ext/filters/http/server/http_server_filter.cc",
],
hdrs = [
"//src/core:ext/filters/http/client/http_client_filter.h",
"//src/core:ext/filters/http/message_compress/compression_filter.h",
"//src/core:ext/filters/http/message_compress/legacy_compression_filter.h",
"//src/core:ext/filters/http/server/http_server_filter.h",
],
external_deps = [
@ -3623,6 +3625,7 @@ grpc_cc_library(
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:experiments",
"//src/core:grpc_message_size_filter",
"//src/core:latch",
"//src/core:map",

2
CMakeLists.txt generated

@ -1826,6 +1826,7 @@ add_library(grpc
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/compression_filter.cc
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/rbac/rbac_filter.cc
@ -2882,6 +2883,7 @@ add_library(grpc_unsecure
src/core/ext/filters/http/client_authority_filter.cc
src/core/ext/filters/http/http_filters_plugin.cc
src/core/ext/filters/http/message_compress/compression_filter.cc
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
src/core/ext/filters/http/server/http_server_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/transport/chttp2/client/chttp2_connector.cc

2
Makefile generated

@ -1028,6 +1028,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/rbac/rbac_filter.cc \
@ -1934,6 +1935,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/transport/chttp2/client/chttp2_connector.cc \

2
Package.swift generated

@ -249,6 +249,8 @@ let package = Package(
"src/core/ext/filters/http/http_filters_plugin.cc",
"src/core/ext/filters/http/message_compress/compression_filter.cc",
"src/core/ext/filters/http/message_compress/compression_filter.h",
"src/core/ext/filters/http/message_compress/legacy_compression_filter.cc",
"src/core/ext/filters/http/message_compress/legacy_compression_filter.h",
"src/core/ext/filters/http/server/http_server_filter.cc",
"src/core/ext/filters/http/server/http_server_filter.h",
"src/core/ext/filters/message_size/message_size_filter.cc",

@ -51,6 +51,7 @@ EXPERIMENT_ENABLES = {
"trace_record_callops": "trace_record_callops",
"unconstrained_max_quota_buffer_size": "unconstrained_max_quota_buffer_size",
"v3_channel_idle_filters": "v3_channel_idle_filters",
"v3_compression_filter": "v3_compression_filter",
"work_serializer_clears_time_cache": "work_serializer_clears_time_cache",
"work_serializer_dispatch": "work_serializer_dispatch",
"write_size_policy": "write_size_policy",
@ -66,6 +67,9 @@ EXPERIMENTS = {
"bad_client_test": [
"rfc_max_concurrent_streams",
],
"compression_test": [
"v3_compression_filter",
],
"core_end2end_test": [
"promise_based_client_call",
"promise_based_server_call",
@ -145,6 +149,9 @@ EXPERIMENTS = {
"bad_client_test": [
"rfc_max_concurrent_streams",
],
"compression_test": [
"v3_compression_filter",
],
"core_end2end_test": [
"promise_based_client_call",
"promise_based_server_call",
@ -221,6 +228,9 @@ EXPERIMENTS = {
"cancel_ares_query_test": [
"event_engine_dns",
],
"compression_test": [
"v3_compression_filter",
],
"core_end2end_test": [
"event_engine_client",
"promise_based_client_call",

@ -276,6 +276,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/compression_filter.h
- src/core/ext/filters/http/message_compress/legacy_compression_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
- src/core/ext/filters/rbac/rbac_filter.h
@ -1280,6 +1281,7 @@ libs:
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/compression_filter.cc
- src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
- src/core/ext/filters/rbac/rbac_filter.cc
@ -2210,6 +2212,7 @@ libs:
- src/core/ext/filters/http/client/http_client_filter.h
- src/core/ext/filters/http/client_authority_filter.h
- src/core/ext/filters/http/message_compress/compression_filter.h
- src/core/ext/filters/http/message_compress/legacy_compression_filter.h
- src/core/ext/filters/http/server/http_server_filter.h
- src/core/ext/filters/message_size/message_size_filter.h
- src/core/ext/transport/chttp2/client/chttp2_connector.h
@ -2689,6 +2692,7 @@ libs:
- src/core/ext/filters/http/client_authority_filter.cc
- src/core/ext/filters/http/http_filters_plugin.cc
- src/core/ext/filters/http/message_compress/compression_filter.cc
- src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
- src/core/ext/filters/http/server/http_server_filter.cc
- src/core/ext/filters/message_size/message_size_filter.cc
- src/core/ext/transport/chttp2/client/chttp2_connector.cc

1
config.m4 generated

@ -115,6 +115,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/http/client_authority_filter.cc \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/rbac/rbac_filter.cc \

1
config.w32 generated

@ -80,6 +80,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\http\\client_authority_filter.cc " +
"src\\core\\ext\\filters\\http\\http_filters_plugin.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\compression_filter.cc " +
"src\\core\\ext\\filters\\http\\message_compress\\legacy_compression_filter.cc " +
"src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " +
"src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +
"src\\core\\ext\\filters\\rbac\\rbac_filter.cc " +

2
gRPC-C++.podspec generated

@ -309,6 +309,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',
@ -1562,6 +1563,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',

3
gRPC-Core.podspec generated

@ -352,6 +352,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.cc',
@ -2346,6 +2348,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/http/client/http_client_filter.h',
'src/core/ext/filters/http/client_authority_filter.h',
'src/core/ext/filters/http/message_compress/compression_filter.h',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.h',
'src/core/ext/filters/http/server/http_server_filter.h',
'src/core/ext/filters/message_size/message_size_filter.h',
'src/core/ext/filters/rbac/rbac_filter.h',

2
grpc.gemspec generated

@ -255,6 +255,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc )
s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/compression_filter.h )
s.files += %w( src/core/ext/filters/http/message_compress/legacy_compression_filter.cc )
s.files += %w( src/core/ext/filters/http/message_compress/legacy_compression_filter.h )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc )
s.files += %w( src/core/ext/filters/http/server/http_server_filter.h )
s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc )

2
grpc.gyp generated

@ -348,6 +348,7 @@
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/rbac/rbac_filter.cc',
@ -1196,6 +1197,7 @@
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/transport/chttp2/client/chttp2_connector.cc',

2
package.xml generated

@ -237,6 +237,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/http/http_filters_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/compression_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/compression_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/legacy_compression_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/legacy_compression_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.cc" role="src" />

@ -22,10 +22,12 @@
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/ext/filters/http/message_compress/compression_filter.h"
#include "src/core/ext/filters/http/message_compress/legacy_compression_filter.h"
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
@ -38,20 +40,38 @@ bool IsBuildingHttpLikeTransport(const ChannelArgs& args) {
} // namespace
void RegisterHttpFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
if (IsV3CompressionFilterEnabled()) {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&ClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &ServerCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
} else {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&LegacyClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
&LegacyClientCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpClientFilter::kFilter, &ClientMessageSizeFilter::kFilter});
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL,
&LegacyServerCompressionFilter::kFilter)
.If(IsBuildingHttpLikeTransport)
.After({&HttpServerFilter::kFilter, &ServerMessageSizeFilter::kFilter});
}
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &HttpClientFilter::kFilter)
.If(IsBuildingHttpLikeTransport)

@ -56,6 +56,11 @@
namespace grpc_core {
const NoInterceptor ServerCompressionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ServerCompressionFilter::Call::OnFinalize;
const NoInterceptor ClientCompressionFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ClientCompressionFilter::Call::OnFinalize;
const grpc_channel_filter ClientCompressionFilter::kFilter =
MakePromiseBasedFilter<ClientCompressionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata |
@ -77,7 +82,7 @@ absl::StatusOr<ServerCompressionFilter> ServerCompressionFilter::Create(
return ServerCompressionFilter(args);
}
CompressionFilter::CompressionFilter(const ChannelArgs& args)
ChannelCompression::ChannelCompression(const ChannelArgs& args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)),
message_size_service_config_parser_index_(
MessageSizeParser::ParserIndex()),
@ -105,7 +110,7 @@ CompressionFilter::CompressionFilter(const ChannelArgs& args)
}
}
MessageHandle CompressionFilter::CompressMessage(
MessageHandle ChannelCompression::CompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d",
@ -163,7 +168,7 @@ MessageHandle CompressionFilter::CompressMessage(
return message;
}
absl::StatusOr<MessageHandle> CompressionFilter::DecompressMessage(
absl::StatusOr<MessageHandle> ChannelCompression::DecompressMessage(
MessageHandle message, DecompressArgs args) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d",
@ -208,7 +213,7 @@ absl::StatusOr<MessageHandle> CompressionFilter::DecompressMessage(
return std::move(message);
}
grpc_compression_algorithm CompressionFilter::HandleOutgoingMetadata(
grpc_compression_algorithm ChannelCompression::HandleOutgoingMetadata(
grpc_metadata_batch& outgoing_metadata) {
const auto algorithm = outgoing_metadata.Take(GrpcInternalEncodingRequest())
.value_or(default_compression_algorithm());
@ -221,7 +226,7 @@ grpc_compression_algorithm CompressionFilter::HandleOutgoingMetadata(
return algorithm;
}
CompressionFilter::DecompressArgs CompressionFilter::HandleIncomingMetadata(
ChannelCompression::DecompressArgs ChannelCompression::HandleIncomingMetadata(
const grpc_metadata_batch& incoming_metadata) {
// Configure max receive size.
auto max_recv_message_length = max_recv_size_;
@ -232,89 +237,59 @@ CompressionFilter::DecompressArgs CompressionFilter::HandleIncomingMetadata(
if (limits != nullptr && limits->max_recv_size().has_value() &&
(!max_recv_message_length.has_value() ||
*limits->max_recv_size() < *max_recv_message_length)) {
max_recv_message_length = *limits->max_recv_size();
max_recv_message_length = limits->max_recv_size();
}
return DecompressArgs{incoming_metadata.get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE),
max_recv_message_length};
}
ArenaPromise<ServerMetadataHandle> ClientCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto compression_algorithm =
HandleOutgoingMetadata(*call_args.client_initial_metadata);
call_args.client_to_server_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), compression_algorithm);
});
auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>(
DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt});
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.server_initial_metadata->InterceptAndMap(
[decompress_args, this](ServerMetadataHandle server_initial_metadata)
-> absl::optional<ServerMetadataHandle> {
if (server_initial_metadata == nullptr) return absl::nullopt;
*decompress_args = HandleIncomingMetadata(*server_initial_metadata);
return std::move(server_initial_metadata);
});
call_args.server_to_client_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), *decompress_args);
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
void ClientCompressionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ClientCompressionFilter* filter) {
compression_algorithm_ =
filter->compression_engine_.HandleOutgoingMetadata(md);
}
MessageHandle ClientCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ClientCompressionFilter* filter) {
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
}
void ClientCompressionFilter::Call::OnServerInitialMetadata(
ServerMetadata& md, ClientCompressionFilter* filter) {
decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md);
}
absl::StatusOr<MessageHandle>
ClientCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ClientCompressionFilter* filter) {
return filter->compression_engine_.DecompressMessage(std::move(message),
decompress_args_);
}
void ServerCompressionFilter::Call::OnClientInitialMetadata(
ClientMetadata& md, ServerCompressionFilter* filter) {
decompress_args_ = filter->compression_engine_.HandleIncomingMetadata(md);
}
absl::StatusOr<MessageHandle>
ServerCompressionFilter::Call::OnClientToServerMessage(
MessageHandle message, ServerCompressionFilter* filter) {
return filter->compression_engine_.DecompressMessage(std::move(message),
decompress_args_);
}
void ServerCompressionFilter::Call::OnServerInitialMetadata(
ServerMetadata& md, ServerCompressionFilter* filter) {
compression_algorithm_ =
filter->compression_engine_.HandleOutgoingMetadata(md);
}
ArenaPromise<ServerMetadataHandle> ServerCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto decompress_args =
HandleIncomingMetadata(*call_args.client_initial_metadata);
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.client_to_server_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
Activity::current()->DebugTag().c_str(),
r.status().ToString().c_str());
}
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
auto* compression_algorithm =
GetContext<Arena>()->New<grpc_compression_algorithm>();
call_args.server_initial_metadata->InterceptAndMap(
[this, compression_algorithm](ServerMetadataHandle md) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[compression] Write metadata",
Activity::current()->DebugTag().c_str());
}
// Find the compression algorithm.
*compression_algorithm = HandleOutgoingMetadata(*md);
return md;
});
call_args.server_to_client_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), *compression_algorithm);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
MessageHandle ServerCompressionFilter::Call::OnServerToClientMessage(
MessageHandle message, ServerCompressionFilter* filter) {
return filter->compression_engine_.CompressMessage(std::move(message),
compression_algorithm_);
}
} // namespace grpc_core

@ -61,15 +61,15 @@ namespace grpc_core {
/// the aforementioned 'grpc-encoding' metadata value, data will pass through
/// uncompressed.
class CompressionFilter : public ChannelFilter {
protected:
class ChannelCompression {
public:
explicit ChannelCompression(const ChannelArgs& args);
struct DecompressArgs {
grpc_compression_algorithm algorithm;
absl::optional<uint32_t> max_recv_message_length;
};
explicit CompressionFilter(const ChannelArgs& args);
grpc_compression_algorithm default_compression_algorithm() const {
return default_compression_algorithm_;
}
@ -104,7 +104,8 @@ class CompressionFilter : public ChannelFilter {
bool enable_decompression_;
};
class ClientCompressionFilter final : public CompressionFilter {
class ClientCompressionFilter final
: public ImplementChannelFilter<ClientCompressionFilter> {
public:
static const grpc_channel_filter kFilter;
@ -112,14 +113,35 @@ class ClientCompressionFilter final : public CompressionFilter {
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md,
ClientCompressionFilter* filter);
MessageHandle OnClientToServerMessage(MessageHandle message,
ClientCompressionFilter* filter);
void OnServerInitialMetadata(ServerMetadata& md,
ClientCompressionFilter* filter);
absl::StatusOr<MessageHandle> OnServerToClientMessage(
MessageHandle message, ClientCompressionFilter* filter);
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
private:
grpc_compression_algorithm compression_algorithm_;
ChannelCompression::DecompressArgs decompress_args_;
};
private:
using CompressionFilter::CompressionFilter;
explicit ClientCompressionFilter(const ChannelArgs& args)
: compression_engine_(args) {}
ChannelCompression compression_engine_;
};
class ServerCompressionFilter final : public CompressionFilter {
class ServerCompressionFilter final
: public ImplementChannelFilter<ServerCompressionFilter> {
public:
static const grpc_channel_filter kFilter;
@ -127,11 +149,31 @@ class ServerCompressionFilter final : public CompressionFilter {
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md,
ServerCompressionFilter* filter);
absl::StatusOr<MessageHandle> OnClientToServerMessage(
MessageHandle message, ServerCompressionFilter* filter);
void OnServerInitialMetadata(ServerMetadata& md,
ServerCompressionFilter* filter);
MessageHandle OnServerToClientMessage(MessageHandle message,
ServerCompressionFilter* filter);
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
private:
ChannelCompression::DecompressArgs decompress_args_;
grpc_compression_algorithm compression_algorithm_;
};
private:
using CompressionFilter::CompressionFilter;
explicit ServerCompressionFilter(const ChannelArgs& args)
: compression_engine_(args) {}
ChannelCompression compression_engine_;
};
} // namespace grpc_core

@ -0,0 +1,325 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/http/message_compress/legacy_compression_filter.h"
#include <inttypes.h>
#include <functional>
#include <memory>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/compression_types.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/call_trace.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
const grpc_channel_filter LegacyClientCompressionFilter::kFilter =
MakePromiseBasedFilter<
LegacyClientCompressionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages>("compression");
const grpc_channel_filter LegacyServerCompressionFilter::kFilter =
MakePromiseBasedFilter<
LegacyServerCompressionFilter, FilterEndpoint::kServer,
kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages>("compression");
absl::StatusOr<LegacyClientCompressionFilter>
LegacyClientCompressionFilter::Create(const ChannelArgs& args,
ChannelFilter::Args) {
return LegacyClientCompressionFilter(args);
}
absl::StatusOr<LegacyServerCompressionFilter>
LegacyServerCompressionFilter::Create(const ChannelArgs& args,
ChannelFilter::Args) {
return LegacyServerCompressionFilter(args);
}
LegacyCompressionFilter::LegacyCompressionFilter(const ChannelArgs& args)
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)),
message_size_service_config_parser_index_(
MessageSizeParser::ParserIndex()),
default_compression_algorithm_(
DefaultCompressionAlgorithmFromChannelArgs(args).value_or(
GRPC_COMPRESS_NONE)),
enabled_compression_algorithms_(
CompressionAlgorithmSet::FromChannelArgs(args)),
enable_compression_(
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION).value_or(true)),
enable_decompression_(
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION)
.value_or(true)) {
// Make sure the default is enabled.
if (!enabled_compression_algorithms_.IsSet(default_compression_algorithm_)) {
const char* name;
if (!grpc_compression_algorithm_name(default_compression_algorithm_,
&name)) {
name = "<unknown>";
}
gpr_log(GPR_ERROR,
"default compression algorithm %s not enabled: switching to none",
name);
default_compression_algorithm_ = GRPC_COMPRESS_NONE;
}
}
MessageHandle LegacyCompressionFilter::CompressMessage(
MessageHandle message, grpc_compression_algorithm algorithm) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d",
message->payload()->Length(), algorithm, message->flags());
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordSendMessage(*message->payload());
}
// Check if we're allowed to compress this message
// (apps might want to disable compression for certain messages to avoid
// crime/beast like vulns).
uint32_t& flags = message->mutable_flags();
if (algorithm == GRPC_COMPRESS_NONE || !enable_compression_ ||
(flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS))) {
return message;
}
// Try to compress the payload.
SliceBuffer tmp;
SliceBuffer* payload = message->payload();
bool did_compress = grpc_msg_compress(algorithm, payload->c_slice_buffer(),
tmp.c_slice_buffer());
// If we achieved compression send it as compressed, otherwise send it as (to
// avoid spending cycles on the receiver decompressing).
if (did_compress) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
const size_t before_size = payload->Length();
const size_t after_size = tmp.Length();
const float savings_ratio = 1.0f - static_cast<float>(after_size) /
static_cast<float>(before_size);
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name));
gpr_log(GPR_INFO,
"Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
" bytes (%.2f%% savings)",
algo_name, before_size, after_size, 100 * savings_ratio);
}
tmp.Swap(payload);
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
if (call_tracer != nullptr) {
call_tracer->RecordSendCompressedMessage(*message->payload());
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
const char* algo_name;
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name));
gpr_log(GPR_INFO,
"Algorithm '%s' enabled but decided not to compress. Input size: "
"%" PRIuPTR,
algo_name, payload->Length());
}
}
return message;
}
absl::StatusOr<MessageHandle> LegacyCompressionFilter::DecompressMessage(
MessageHandle message, DecompressArgs args) const {
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d",
message->payload()->Length(),
args.max_recv_message_length.value_or(-1), args.algorithm);
}
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallTracerInterface*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer != nullptr) {
call_tracer->RecordReceivedMessage(*message->payload());
}
// Check max message length.
if (args.max_recv_message_length.has_value() &&
message->payload()->Length() >
static_cast<size_t>(*args.max_recv_message_length)) {
return absl::ResourceExhaustedError(absl::StrFormat(
"Received message larger than max (%u vs. %d)",
message->payload()->Length(), *args.max_recv_message_length));
}
// Check if decompression is enabled (if not, we can just pass the message
// up).
if (!enable_decompression_ ||
(message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == 0) {
return std::move(message);
}
// Try to decompress the payload.
SliceBuffer decompressed_slices;
if (grpc_msg_decompress(args.algorithm, message->payload()->c_slice_buffer(),
decompressed_slices.c_slice_buffer()) == 0) {
return absl::InternalError(
absl::StrCat("Unexpected error decompressing data for algorithm ",
CompressionAlgorithmAsString(args.algorithm)));
}
// Swap the decompressed slices into the message.
message->payload()->Swap(&decompressed_slices);
message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS;
message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
if (call_tracer != nullptr) {
call_tracer->RecordReceivedDecompressedMessage(*message->payload());
}
return std::move(message);
}
grpc_compression_algorithm LegacyCompressionFilter::HandleOutgoingMetadata(
grpc_metadata_batch& outgoing_metadata) {
const auto algorithm = outgoing_metadata.Take(GrpcInternalEncodingRequest())
.value_or(default_compression_algorithm());
// Convey supported compression algorithms.
outgoing_metadata.Set(GrpcAcceptEncodingMetadata(),
enabled_compression_algorithms());
if (algorithm != GRPC_COMPRESS_NONE) {
outgoing_metadata.Set(GrpcEncodingMetadata(), algorithm);
}
return algorithm;
}
LegacyCompressionFilter::DecompressArgs
LegacyCompressionFilter::HandleIncomingMetadata(
const grpc_metadata_batch& incoming_metadata) {
// Configure max receive size.
auto max_recv_message_length = max_recv_size_;
const MessageSizeParsedConfig* limits =
MessageSizeParsedConfig::GetFromCallContext(
GetContext<grpc_call_context_element>(),
message_size_service_config_parser_index_);
if (limits != nullptr && limits->max_recv_size().has_value() &&
(!max_recv_message_length.has_value() ||
*limits->max_recv_size() < *max_recv_message_length)) {
max_recv_message_length = *limits->max_recv_size();
}
return DecompressArgs{incoming_metadata.get(GrpcEncodingMetadata())
.value_or(GRPC_COMPRESS_NONE),
max_recv_message_length};
}
ArenaPromise<ServerMetadataHandle>
LegacyClientCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto compression_algorithm =
HandleOutgoingMetadata(*call_args.client_initial_metadata);
call_args.client_to_server_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), compression_algorithm);
});
auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>(
DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt});
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.server_initial_metadata->InterceptAndMap(
[decompress_args, this](ServerMetadataHandle server_initial_metadata)
-> absl::optional<ServerMetadataHandle> {
if (server_initial_metadata == nullptr) return absl::nullopt;
*decompress_args = HandleIncomingMetadata(*server_initial_metadata);
return std::move(server_initial_metadata);
});
call_args.server_to_client_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), *decompress_args);
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}
ArenaPromise<ServerMetadataHandle>
LegacyServerCompressionFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto decompress_args =
HandleIncomingMetadata(*call_args.client_initial_metadata);
auto* decompress_err =
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>();
call_args.client_to_server_messages->InterceptAndMap(
[decompress_err, decompress_args,
this](MessageHandle message) -> absl::optional<MessageHandle> {
auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
Activity::current()->DebugTag().c_str(),
r.status().ToString().c_str());
}
if (!r.ok()) {
decompress_err->Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
}
return std::move(*r);
});
auto* compression_algorithm =
GetContext<Arena>()->New<grpc_compression_algorithm>();
call_args.server_initial_metadata->InterceptAndMap(
[this, compression_algorithm](ServerMetadataHandle md) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[compression] Write metadata",
Activity::current()->DebugTag().c_str());
}
// Find the compression algorithm.
*compression_algorithm = HandleOutgoingMetadata(*md);
return md;
});
call_args.server_to_client_messages->InterceptAndMap(
[compression_algorithm,
this](MessageHandle message) -> absl::optional<MessageHandle> {
return CompressMessage(std::move(message), *compression_algorithm);
});
// Run the next filter, and race it with getting an error from decompression.
return PrioritizedRace(decompress_err->Wait(),
next_promise_factory(std::move(call_args)));
}
} // namespace grpc_core

@ -0,0 +1,139 @@
//
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H
#define GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/impl/compression_types.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
/// Compression filter for messages.
///
/// See <grpc/compression.h> for the available compression settings.
///
/// Compression settings may come from:
/// - Channel configuration, as established at channel creation time.
/// - The metadata accompanying the outgoing data to be compressed. This is
/// taken as a request only. We may choose not to honor it. The metadata key
/// is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY.
///
/// Compression can be disabled for concrete messages (for instance in order to
/// prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set
/// in the MessageHandle flags.
///
/// The attempted compression mechanism is added to the resulting initial
/// metadata under the 'grpc-encoding' key.
///
/// If compression is actually performed, the MessageHandle's flag is modified
/// to incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of
/// the aforementioned 'grpc-encoding' metadata value, data will pass through
/// uncompressed.
class LegacyCompressionFilter : public ChannelFilter {
protected:
struct DecompressArgs {
grpc_compression_algorithm algorithm;
absl::optional<uint32_t> max_recv_message_length;
};
explicit LegacyCompressionFilter(const ChannelArgs& args);
grpc_compression_algorithm default_compression_algorithm() const {
return default_compression_algorithm_;
}
CompressionAlgorithmSet enabled_compression_algorithms() const {
return enabled_compression_algorithms_;
}
grpc_compression_algorithm HandleOutgoingMetadata(
grpc_metadata_batch& outgoing_metadata);
DecompressArgs HandleIncomingMetadata(
const grpc_metadata_batch& incoming_metadata);
// Compress one message synchronously.
MessageHandle CompressMessage(MessageHandle message,
grpc_compression_algorithm algorithm) const;
// Decompress one message synchronously.
absl::StatusOr<MessageHandle> DecompressMessage(MessageHandle message,
DecompressArgs args) const;
private:
// Max receive message length, if set.
absl::optional<uint32_t> max_recv_size_;
size_t message_size_service_config_parser_index_;
// The default, channel-level, compression algorithm.
grpc_compression_algorithm default_compression_algorithm_;
// Enabled compression algorithms.
CompressionAlgorithmSet enabled_compression_algorithms_;
// Is compression enabled?
bool enable_compression_;
// Is decompression enabled?
bool enable_decompression_;
};
class LegacyClientCompressionFilter final : public LegacyCompressionFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<LegacyClientCompressionFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using LegacyCompressionFilter::LegacyCompressionFilter;
};
class LegacyServerCompressionFilter final : public LegacyCompressionFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<LegacyServerCompressionFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
using LegacyCompressionFilter::LegacyCompressionFilter;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H

@ -139,11 +139,21 @@ inline constexpr bool HasAsyncErrorInterceptor(const NoInterceptor*) {
return false;
}
template <typename T, typename A0, typename... As>
inline constexpr bool HasAsyncErrorInterceptor(A0 (T::*)(A0, As...)) {
return false;
}
template <typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(absl::Status (T::*)(A...)) {
return true;
}
template <typename R, typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(absl::StatusOr<R> (T::*)(A...)) {
return true;
}
template <typename T, typename... A>
inline constexpr bool HasAsyncErrorInterceptor(
ServerMetadataHandle (T::*)(A...)) {
@ -404,6 +414,34 @@ inline void InterceptClientToServerMessage(
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
return call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call_data->call.OnClientToServerMessage(std::move(msg),
call_data->channel);
if (r.ok()) return std::move(*r);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -412,7 +450,7 @@ inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
@ -426,7 +464,7 @@ inline void InterceptClientToServerMessage(
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg, channel);
@ -435,6 +473,33 @@ inline void InterceptClientToServerMessage(
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, channel](MessageHandle msg) {
return call->OnClientToServerMessage(std::move(msg), channel);
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnClientToServerMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
});
}
inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -526,6 +591,36 @@ inline void InterceptServerInitialMetadata(
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_args.server_initial_metadata->InterceptAndMap(
[call_data](ServerMetadataHandle md) {
call_data->call.OnServerInitialMetadata(*md, call_data->channel);
return md;
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_args.server_initial_metadata->InterceptAndMap(
[call_data](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status =
call_data->call.OnServerInitialMetadata(*md, call_data->channel);
if (!status.ok() && !call_data->error_latch.is_set()) {
call_data->error_latch.Set(ServerMetadataFromStatus(status));
return absl::nullopt;
}
return std::move(md);
});
}
inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -555,6 +650,34 @@ inline void InterceptServerInitialMetadata(
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, channel](ServerMetadataHandle md) {
call->OnServerInitialMetadata(*md, channel);
return md;
});
}
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, call_spine, channel](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerInitialMetadata(*md, channel);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
});
}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*,
const CallArgs&) {}
@ -589,6 +712,34 @@ inline void InterceptServerToClientMessage(
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
return call_data->call.OnServerToClientMessage(std::move(msg),
call_data->channel);
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call_data->call.OnServerToClientMessage(std::move(msg),
call_data->channel);
if (r.ok()) return std::move(*r);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -620,6 +771,33 @@ inline void InterceptServerToClientMessage(
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, channel](MessageHandle msg) {
return call->OnServerToClientMessage(std::move(msg), channel);
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnServerToClientMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
});
}
inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}

@ -155,6 +155,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size =
const char* const description_v3_channel_idle_filters =
"Use the v3 filter API version of the idle filters.";
const char* const additional_constraints_v3_channel_idle_filters = "{}";
const char* const description_v3_compression_filter =
"Use the compression filter utilizing the v3 filter api";
const char* const additional_constraints_v3_compression_filter = "{}";
const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
@ -269,6 +272,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
false, true},
{"v3_channel_idle_filters", description_v3_channel_idle_filters,
additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true},
{"v3_compression_filter", description_v3_compression_filter,
additional_constraints_v3_compression_filter, nullptr, 0, false, true},
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true,
@ -419,6 +424,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size =
const char* const description_v3_channel_idle_filters =
"Use the v3 filter API version of the idle filters.";
const char* const additional_constraints_v3_channel_idle_filters = "{}";
const char* const description_v3_compression_filter =
"Use the compression filter utilizing the v3 filter api";
const char* const additional_constraints_v3_compression_filter = "{}";
const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
@ -533,6 +541,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
false, true},
{"v3_channel_idle_filters", description_v3_channel_idle_filters,
additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true},
{"v3_compression_filter", description_v3_compression_filter,
additional_constraints_v3_compression_filter, nullptr, 0, false, true},
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true,
@ -683,6 +693,9 @@ const char* const additional_constraints_unconstrained_max_quota_buffer_size =
const char* const description_v3_channel_idle_filters =
"Use the v3 filter API version of the idle filters.";
const char* const additional_constraints_v3_channel_idle_filters = "{}";
const char* const description_v3_compression_filter =
"Use the compression filter utilizing the v3 filter api";
const char* const additional_constraints_v3_compression_filter = "{}";
const char* const description_work_serializer_clears_time_cache =
"Have the work serializer clear the time cache when it dispatches work.";
const char* const additional_constraints_work_serializer_clears_time_cache =
@ -797,6 +810,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
false, true},
{"v3_channel_idle_filters", description_v3_channel_idle_filters,
additional_constraints_v3_channel_idle_filters, nullptr, 0, false, true},
{"v3_compression_filter", description_v3_compression_filter,
additional_constraints_v3_compression_filter, nullptr, 0, false, true},
{"work_serializer_clears_time_cache",
description_work_serializer_clears_time_cache,
additional_constraints_work_serializer_clears_time_cache, nullptr, 0, true,

@ -108,6 +108,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
inline bool IsV3ChannelIdleFiltersEnabled() { return false; }
inline bool IsV3CompressionFilterEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
@ -171,6 +172,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
inline bool IsV3ChannelIdleFiltersEnabled() { return false; }
inline bool IsV3CompressionFilterEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
@ -234,6 +236,7 @@ inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
inline bool IsV3ChannelIdleFiltersEnabled() { return false; }
inline bool IsV3CompressionFilterEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
@ -281,6 +284,7 @@ enum ExperimentIds {
kExperimentIdTraceRecordCallops,
kExperimentIdUnconstrainedMaxQuotaBufferSize,
kExperimentIdV3ChannelIdleFilters,
kExperimentIdV3CompressionFilter,
kExperimentIdWorkSerializerClearsTimeCache,
kExperimentIdWorkSerializerDispatch,
kExperimentIdWriteSizePolicy,
@ -424,6 +428,10 @@ inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() {
inline bool IsV3ChannelIdleFiltersEnabled() {
return IsExperimentEnabled(kExperimentIdV3ChannelIdleFilters);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_V3_COMPRESSION_FILTER
inline bool IsV3CompressionFilterEnabled() {
return IsExperimentEnabled(kExperimentIdV3CompressionFilter);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() {
return IsExperimentEnabled(kExperimentIdWorkSerializerClearsTimeCache);

@ -259,6 +259,12 @@
expiry: 2024/04/04
owner: ctiller@google.com
test_tags: []
- name: v3_compression_filter
description:
Use the compression filter utilizing the v3 filter api
expiry: 2024/04/04
owner: ctiller@google.com
test_tags: ["compression_test"]
- name: work_serializer_clears_time_cache
description:
Have the work serializer clear the time cache when it dispatches work.

@ -89,6 +89,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/http/client_authority_filter.cc',
'src/core/ext/filters/http/http_filters_plugin.cc',
'src/core/ext/filters/http/message_compress/compression_filter.cc',
'src/core/ext/filters/http/message_compress/legacy_compression_filter.cc',
'src/core/ext/filters/http/server/http_server_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/rbac/rbac_filter.cc',

@ -286,7 +286,10 @@ grpc_core_end2end_test(name = "channelz")
grpc_core_end2end_test(name = "client_streaming")
grpc_core_end2end_test(name = "compressed_payload")
grpc_core_end2end_test(
name = "compressed_payload",
tags = ["compression_test"],
)
grpc_core_end2end_test(name = "connectivity")

@ -25,7 +25,7 @@ END2END_TEST_DATA = [
"//src/core/tsi/test_creds:server1.pem",
]
def grpc_core_end2end_test(name, shard_count = 10):
def grpc_core_end2end_test(name, shard_count = 10, tags = []):
if len(name) > 60:
fail("test name %s too long" % name)
@ -45,7 +45,7 @@ def grpc_core_end2end_test(name, shard_count = 10):
"absl/types:optional",
"gtest",
],
tags = ["core_end2end_test"],
tags = ["core_end2end_test"] + tags,
deps = [
"end2end_test_main",
"cq_verifier",

@ -1215,6 +1215,8 @@ src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.h \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \

@ -1024,6 +1024,8 @@ src/core/ext/filters/http/client_authority_filter.h \
src/core/ext/filters/http/http_filters_plugin.cc \
src/core/ext/filters/http/message_compress/compression_filter.cc \
src/core/ext/filters/http/message_compress/compression_filter.h \
src/core/ext/filters/http/message_compress/legacy_compression_filter.cc \
src/core/ext/filters/http/message_compress/legacy_compression_filter.h \
src/core/ext/filters/http/server/http_server_filter.cc \
src/core/ext/filters/http/server/http_server_filter.h \
src/core/ext/filters/message_size/message_size_filter.cc \

Loading…
Cancel
Save